XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

+ Inheritance diagram for XrdCl::XRootDTransport:
+ Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor.
 
 ~XRootDTransport ()
 Destructor.
 
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel.
 
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups.
 
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel.
 
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream.
 
virtual XRootDStatus GetBody (Message &message, Socket *socket)
 
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
 
virtual XRootDStatus GetMore (Message &message, Socket *socket)
 
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message.
 
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message.
 
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake.
 
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel.
 
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
 
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected.
 
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action.
 
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent.
 
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
 
virtual bool NeedControlConnection ()
 
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
 
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel.
 
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created.
 
virtual void WaitBeforeExit ()
 Wait until the program can safely exit.
 
- Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()
 

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message.
 
static void LogErrorResponse (const Message &msg)
 Log server error response.
 
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message.
 
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message.
 
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams.
 
static void SetDescription (Message *msg)
 Get the description of a message.
 
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite.
 
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message.
 
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message.
 
static XRootDStatus UnMarshallRequest (Message *msg)
 
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response.
 

Friends

struct PluginUnloadHandler
 

Additional Inherited Members

- Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...
 

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 291 of file XrdClXRootDTransport.cc.

291 :
292 pSecUnloadHandler( new PluginUnloadHandler() )
293 {
294 }

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 299 of file XrdClXRootDTransport.cc.

300 {
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
302 }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject & channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1743 of file XrdClXRootDTransport.cc.

1744 {
1745 XRootDChannelInfo *info = 0;
1746 channelData.Get( info );
1747 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1748 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1749 }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject & channelData,
uint16_t subStreamId )
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1485 of file XrdClXRootDTransport.cc.

1487 {
1488 XRootDChannelInfo *info = 0;
1489 channelData.Get( info );
1490 XrdSysMutexHelper scopedLock( info->mutex );
1491
1492 CleanUpProtection( info );
1493
1494 if( !info->stream.empty() )
1495 {
1496 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1497 sInfo.status = XRootDStreamInfo::Disconnected;
1498 }
1499
1500 if( subStreamId == 0 )
1501 {
1502 info->sidManager->ReleaseAllTimedOut();
1503 info->sentOpens.clear();
1504 info->sentCloses.clear();
1505 info->openFiles = 0;
1506 info->waitBarrier = 0;
1507 }
1508 }

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::waitBarrier.

+ Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject & channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 460 of file XrdClXRootDTransport.cc.

461 {
462 }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char * msg,
std::ostringstream & o )
static

Get the description of a message.

Definition at line 2904 of file XrdClXRootDTransport.cc.

2905 {
2906 Log *log = DefaultEnv::GetLog();
2907 if( log->GetLevel() < Log::ErrorMsg )
2908 return;
2909
2910 ClientRequestHdr *req = (ClientRequestHdr *)msg;
2911 switch( req->requestid )
2912 {
2913 //------------------------------------------------------------------------
2914 // kXR_open
2915 //------------------------------------------------------------------------
2916 case kXR_open:
2917 {
2918 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2919 o << "kXR_open (";
2920 char *fn = GetDataAsString( msg );
2921 o << "file: " << fn << ", ";
2922 delete [] fn;
2923 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2924 o << std::setbase(10);
2925 o << "flags: ";
2926 if( sreq->options == 0 )
2927 o << "none";
2928 else
2929 {
2930 if( sreq->options & kXR_compress )
2931 o << "kXR_compress ";
2932 if( sreq->options & kXR_delete )
2933 o << "kXR_delete ";
2934 if( sreq->options & kXR_force )
2935 o << "kXR_force ";
2936 if( sreq->options & kXR_mkpath )
2937 o << "kXR_mkpath ";
2938 if( sreq->options & kXR_new )
2939 o << "kXR_new ";
2940 if( sreq->options & kXR_nowait )
2941 o << "kXR_nowait ";
2942 if( sreq->options & kXR_open_apnd )
2943 o << "kXR_open_apnd ";
2944 if( sreq->options & kXR_open_read )
2945 o << "kXR_open_read ";
2946 if( sreq->options & kXR_open_updt )
2947 o << "kXR_open_updt ";
2948 if( sreq->options & kXR_open_wrto )
2949 o << "kXR_open_wrto ";
2950 if( sreq->options & kXR_posc )
2951 o << "kXR_posc ";
2952 if( sreq->options & kXR_prefname )
2953 o << "kXR_prefname ";
2954 if( sreq->options & kXR_refresh )
2955 o << "kXR_refresh ";
2956 if( sreq->options & kXR_4dirlist )
2957 o << "kXR_4dirlist ";
2958 if( sreq->options & kXR_replica )
2959 o << "kXR_replica ";
2960 if( sreq->options & kXR_seqio )
2961 o << "kXR_seqio ";
2962 if( sreq->options & kXR_async )
2963 o << "kXR_async ";
2964 if( sreq->options & kXR_retstat )
2965 o << "kXR_retstat ";
2966 }
2967 o << ")";
2968 break;
2969 }
2970
2971 //------------------------------------------------------------------------
2972 // kXR_close
2973 //------------------------------------------------------------------------
2974 case kXR_close:
2975 {
2977 o << "kXR_close (";
2978 o << "handle: " << FileHandleToStr( sreq->fhandle );
2979 o << ")";
2980 break;
2981 }
2982
2983 //------------------------------------------------------------------------
2984 // kXR_stat
2985 //------------------------------------------------------------------------
2986 case kXR_stat:
2987 {
2988 ClientStatRequest *sreq = (ClientStatRequest *)msg;
2989 o << "kXR_stat (";
2990 if( sreq->dlen )
2991 {
2992 char *fn = GetDataAsString( msg );;
2993 o << "path: " << fn << ", ";
2994 delete [] fn;
2995 }
2996 else
2997 {
2998 o << "handle: " << FileHandleToStr( sreq->fhandle );
2999 o << ", ";
3000 }
3001 o << "flags: ";
3002 if( sreq->options == 0 )
3003 o << "none";
3004 else
3005 {
3006 if( sreq->options & kXR_vfs )
3007 o << "kXR_vfs";
3008 }
3009 o << ")";
3010 break;
3011 }
3012
3013 //------------------------------------------------------------------------
3014 // kXR_read
3015 //------------------------------------------------------------------------
3016 case kXR_read:
3017 {
3018 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3019 o << "kXR_read (";
3020 o << "handle: " << FileHandleToStr( sreq->fhandle );
3021 o << std::setbase(10);
3022 o << ", ";
3023 o << "offset: " << sreq->offset << ", ";
3024 o << "size: " << sreq->rlen << ")";
3025 break;
3026 }
3027
3028 //------------------------------------------------------------------------
3029 // kXR_pgread
3030 //------------------------------------------------------------------------
3031 case kXR_pgread:
3032 {
3034 o << "kXR_pgread (";
3035 o << "handle: " << FileHandleToStr( sreq->fhandle );
3036 o << std::setbase(10);
3037 o << ", ";
3038 o << "offset: " << sreq->offset << ", ";
3039 o << "size: " << sreq->rlen << ")";
3040 break;
3041 }
3042
3043 //------------------------------------------------------------------------
3044 // kXR_write
3045 //------------------------------------------------------------------------
3046 case kXR_write:
3047 {
3049 o << "kXR_write (";
3050 o << "handle: " << FileHandleToStr( sreq->fhandle );
3051 o << std::setbase(10);
3052 o << ", ";
3053 o << "offset: " << sreq->offset << ", ";
3054 o << "size: " << sreq->dlen << ")";
3055 break;
3056 }
3057
3058 //------------------------------------------------------------------------
3059 // kXR_pgwrite
3060 //------------------------------------------------------------------------
3061 case kXR_pgwrite:
3062 {
3064 o << "kXR_pgwrite (";
3065 o << "handle: " << FileHandleToStr( sreq->fhandle );
3066 o << std::setbase(10);
3067 o << ", ";
3068 o << "offset: " << sreq->offset << ", ";
3069 o << "size: " << sreq->dlen << ")";
3070 break;
3071 }
3072
3073 //------------------------------------------------------------------------
3074 // kXR_fattr
3075 //------------------------------------------------------------------------
3076 case kXR_fattr:
3077 {
3079 int nattr = sreq->numattr;
3080 int options = sreq->options;
3081 o << "kXR_fattr";
3082 switch (sreq->subcode) {
3083 case kXR_fattrGet:
3084 o << "Get";
3085 break;
3086 case kXR_fattrSet:
3087 o << "Set";
3088 break;
3089 case kXR_fattrList:
3090 o << "List";
3091 break;
3092 case kXR_fattrDel:
3093 o << "Delete";
3094 break;
3095 default:
3096 o << " unknown subcode: " << sreq->subcode;
3097 break;
3098 }
3099 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3100 o << std::setbase(10);
3101 if (nattr)
3102 o << ", numattr: " << nattr;
3103 if (options) {
3104 o << ", options: ";
3105 if (options & 0x01)
3106 o << "new";
3107 if (options & 0x10)
3108 o << "list values";
3109 }
3110 o << ", total size: " << req->dlen << ")";
3111 break;
3112 }
3113
3114 //------------------------------------------------------------------------
3115 // kXR_sync
3116 //------------------------------------------------------------------------
3117 case kXR_sync:
3118 {
3119 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3120 o << "kXR_sync (";
3121 o << "handle: " << FileHandleToStr( sreq->fhandle );
3122 o << ")";
3123 break;
3124 }
3125
3126 //------------------------------------------------------------------------
3127 // kXR_truncate
3128 //------------------------------------------------------------------------
3129 case kXR_truncate:
3130 {
3132 o << "kXR_truncate (";
3133 if( !sreq->dlen )
3134 o << "handle: " << FileHandleToStr( sreq->fhandle );
3135 else
3136 {
3137 char *fn = GetDataAsString( msg );
3138 o << "file: " << fn;
3139 delete [] fn;
3140 }
3141 o << std::setbase(10);
3142 o << ", ";
3143 o << "offset: " << sreq->offset;
3144 o << ")";
3145 break;
3146 }
3147
3148 //------------------------------------------------------------------------
3149 // kXR_readv
3150 //------------------------------------------------------------------------
3151 case kXR_readv:
3152 {
3153 unsigned char *fhandle = 0;
3154 o << "kXR_readv (";
3155
3156 o << "handle: ";
3157 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3158 fhandle = dataChunk[0].fhandle;
3159 if( fhandle )
3160 o << FileHandleToStr( fhandle );
3161 else
3162 o << "unknown";
3163 o << ", ";
3164 o << std::setbase(10);
3165 o << "chunks: [";
3166 uint64_t size = 0;
3167 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3168 {
3169 size += dataChunk[i].rlen;
3170 o << "(offset: " << dataChunk[i].offset;
3171 o << ", size: " << dataChunk[i].rlen << "); ";
3172 }
3173 o << "], ";
3174 o << "total size: " << size << ")";
3175 break;
3176 }
3177
3178 //------------------------------------------------------------------------
3179 // kXR_writev
3180 //------------------------------------------------------------------------
3181 case kXR_writev:
3182 {
3183 unsigned char *fhandle = 0;
3184 o << "kXR_writev (";
3185
3186 XrdProto::write_list *wrtList =
3187 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3188 uint64_t size = 0;
3189 uint32_t numChunks = 0;
3190 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3191 {
3192 fhandle = wrtList[i].fhandle;
3193 size += wrtList[i].wlen;
3194 ++numChunks;
3195 }
3196 o << "handle: ";
3197 if( fhandle )
3198 o << FileHandleToStr( fhandle );
3199 else
3200 o << "unknown";
3201 o << ", ";
3202 o << std::setbase(10);
3203 o << "chunks: " << numChunks << ", ";
3204 o << "total size: " << size << ")";
3205 break;
3206 }
3207
3208 //------------------------------------------------------------------------
3209 // kXR_locate
3210 //------------------------------------------------------------------------
3211 case kXR_locate:
3212 {
3214 char *fn = GetDataAsString( msg );;
3215 o << "kXR_locate (";
3216 o << "path: " << fn << ", ";
3217 delete [] fn;
3218 o << "flags: ";
3219 if( sreq->options == 0 )
3220 o << "none";
3221 else
3222 {
3223 if( sreq->options & kXR_refresh )
3224 o << "kXR_refresh ";
3225 if( sreq->options & kXR_prefname )
3226 o << "kXR_prefname ";
3227 if( sreq->options & kXR_nowait )
3228 o << "kXR_nowait ";
3229 if( sreq->options & kXR_force )
3230 o << "kXR_force ";
3231 if( sreq->options & kXR_compress )
3232 o << "kXR_compress ";
3233 }
3234 o << ")";
3235 break;
3236 }
3237
3238 //------------------------------------------------------------------------
3239 // kXR_mv
3240 //------------------------------------------------------------------------
3241 case kXR_mv:
3242 {
3243 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3244 o << "kXR_mv (";
3245 o << "source: ";
3246 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3247 o << ", ";
3248 o << "destination: ";
3249 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3250 o << ")";
3251 break;
3252 }
3253
3254 //------------------------------------------------------------------------
3255 // kXR_query
3256 //------------------------------------------------------------------------
3257 case kXR_query:
3258 {
3260 o << "kXR_query (";
3261 o << "code: ";
3262 switch( sreq->infotype )
3263 {
3264 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3265 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3266 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3267 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3268 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3269 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3270 case kXR_QPrep: o << "kXR_QPrep"; break;
3271 case kXR_Qspace: o << "kXR_Qspace"; break;
3272 case kXR_QStats: o << "kXR_QStats"; break;
3273 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3274 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3275 default: o << sreq->infotype; break;
3276 }
3277 o << ", ";
3278
3279 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3280 {
3281 o << "handle: " << FileHandleToStr( sreq->fhandle );
3282 o << ", ";
3283 }
3284
3285 o << "arg length: " << sreq->dlen << ")";
3286 break;
3287 }
3288
3289 //------------------------------------------------------------------------
3290 // kXR_rm
3291 //------------------------------------------------------------------------
3292 case kXR_rm:
3293 {
3294 o << "kXR_rm (";
3295 char *fn = GetDataAsString( msg );;
3296 o << "path: " << fn << ")";
3297 delete [] fn;
3298 break;
3299 }
3300
3301 //------------------------------------------------------------------------
3302 // kXR_mkdir
3303 //------------------------------------------------------------------------
3304 case kXR_mkdir:
3305 {
3307 o << "kXR_mkdir (";
3308 char *fn = GetDataAsString( msg );
3309 o << "path: " << fn << ", ";
3310 delete [] fn;
3311 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3312 o << std::setbase(10);
3313 o << "flags: ";
3314 if( sreq->options[0] == 0 )
3315 o << "none";
3316 else
3317 {
3318 if( sreq->options[0] & kXR_mkdirpath )
3319 o << "kXR_mkdirpath";
3320 }
3321 o << ")";
3322 break;
3323 }
3324
3325 //------------------------------------------------------------------------
3326 // kXR_rmdir
3327 //------------------------------------------------------------------------
3328 case kXR_rmdir:
3329 {
3330 o << "kXR_rmdir (";
3331 char *fn = GetDataAsString( msg );
3332 o << "path: " << fn << ")";
3333 delete [] fn;
3334 break;
3335 }
3336
3337 //------------------------------------------------------------------------
3338 // kXR_chmod
3339 //------------------------------------------------------------------------
3340 case kXR_chmod:
3341 {
3343 o << "kXR_chmod (";
3344 char *fn = GetDataAsString( msg );
3345 o << "path: " << fn << ", ";
3346 delete [] fn;
3347 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3348 break;
3349 }
3350
3351 //------------------------------------------------------------------------
3352 // kXR_ping
3353 //------------------------------------------------------------------------
3354 case kXR_ping:
3355 {
3356 o << "kXR_ping ()";
3357 break;
3358 }
3359
3360 //------------------------------------------------------------------------
3361 // kXR_protocol
3362 //------------------------------------------------------------------------
3363 case kXR_protocol:
3364 {
3366 o << "kXR_protocol (";
3367 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3368 break;
3369 }
3370
3371 //------------------------------------------------------------------------
3372 // kXR_dirlist
3373 //------------------------------------------------------------------------
3374 case kXR_dirlist:
3375 {
3376 o << "kXR_dirlist (";
3377 char *fn = GetDataAsString( msg );;
3378 o << "path: " << fn << ")";
3379 delete [] fn;
3380 break;
3381 }
3382
3383 //------------------------------------------------------------------------
3384 // kXR_set
3385 //------------------------------------------------------------------------
3386 case kXR_set:
3387 {
3388 o << "kXR_set (";
3389 char *fn = GetDataAsString( msg );;
3390 o << "data: " << fn << ")";
3391 delete [] fn;
3392 break;
3393 }
3394
3395 //------------------------------------------------------------------------
3396 // kXR_prepare
3397 //------------------------------------------------------------------------
3398 case kXR_prepare:
3399 {
3401 o << "kXR_prepare (";
3402 o << "flags: ";
3403
3404 if( sreq->options == 0 )
3405 o << "none";
3406 else
3407 {
3408 if( sreq->options & kXR_stage )
3409 o << "kXR_stage ";
3410 if( sreq->options & kXR_wmode )
3411 o << "kXR_wmode ";
3412 if( sreq->options & kXR_coloc )
3413 o << "kXR_coloc ";
3414 if( sreq->options & kXR_fresh )
3415 o << "kXR_fresh ";
3416 }
3417
3418 o << ", priority: " << (int) sreq->prty << ", ";
3419
3420 char *fn = GetDataAsString( msg );
3421 char *cursor;
3422 for( cursor = fn; *cursor; ++cursor )
3423 if( *cursor == '\n' ) *cursor = ' ';
3424
3425 o << "paths: " << fn << ")";
3426 delete [] fn;
3427 break;
3428 }
3429
3430 case kXR_chkpoint:
3431 {
3433 o << "kXR_chkpoint (";
3434 o << "opcode: ";
3435 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3436 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3437 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3438 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3439 else if( sreq->opcode == kXR_ckpXeq )
3440 {
3441 o << "kXR_ckpXeq) ";
3442 // In this case our request body will be one of kXR_pgwrite,
3443 // kXR_truncate, kXR_write, or kXR_writev request.
3444 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3445 }
3446
3447 break;
3448 }
3449
3450 //------------------------------------------------------------------------
3451 // Default
3452 //------------------------------------------------------------------------
3453 default:
3454 {
3455 o << "kXR_unknown (length: " << req->dlen << ")";
3456 break;
3457 }
3458 };
3459 }
static const int kXR_ckpRollback
Definition XProtocol.hh:215
kXR_int16 arg1len
Definition XProtocol.hh:430
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_char fhandle[4]
Definition XProtocol.hh:782
kXR_char fhandle[4]
Definition XProtocol.hh:807
kXR_char fhandle[4]
Definition XProtocol.hh:771
kXR_int32 dlen
Definition XProtocol.hh:431
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_compress
Definition XProtocol.hh:452
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_prefname
Definition XProtocol.hh:461
@ kXR_nowait
Definition XProtocol.hh:467
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_seqio
Definition XProtocol.hh:468
@ kXR_replica
Definition XProtocol.hh:465
@ kXR_posc
Definition XProtocol.hh:466
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_force
Definition XProtocol.hh:454
@ kXR_4dirlist
Definition XProtocol.hh:464
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
kXR_char fhandle[4]
Definition XProtocol.hh:509
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:659
kXR_char fhandle[4]
Definition XProtocol.hh:229
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_char options[1]
Definition XProtocol.hh:416
static const int kXR_ckpCommit
Definition XProtocol.hh:213
kXR_int64 offset
Definition XProtocol.hh:661
@ kXR_vfs
Definition XProtocol.hh:763
@ kXR_mkdirpath
Definition XProtocol.hh:410
@ kXR_wmode
Definition XProtocol.hh:591
@ kXR_fresh
Definition XProtocol.hh:593
@ kXR_coloc
Definition XProtocol.hh:592
@ kXR_stage
Definition XProtocol.hh:590
static const int kXR_ckpQuery
Definition XProtocol.hh:214
@ kXR_QPrep
Definition XProtocol.hh:616
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qconfig
Definition XProtocol.hh:621
@ kXR_Qopaquf
Definition XProtocol.hh:624
@ kXR_Qckscan
Definition XProtocol.hh:620
@ kXR_Qxattr
Definition XProtocol.hh:618
@ kXR_Qspace
Definition XProtocol.hh:619
@ kXR_Qvisa
Definition XProtocol.hh:622
@ kXR_QStats
Definition XProtocol.hh:615
@ kXR_Qcksum
Definition XProtocol.hh:617
@ kXR_Qopaque
Definition XProtocol.hh:623
static const int kXR_ckpBegin
Definition XProtocol.hh:212
static Log * GetLog()
Get default log.
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:832
kXR_char fhandle[4]
Definition XProtocol.hh:288

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdCl::Log::ErrorMsg, ClientCloseRequest::fhandle, ClientFattrRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, readahead_list::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, XrdProto::write_list::fhandle, GenerateDescription(), XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_4dirlist, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_fattr, kXR_fattrDel, kXR_fattrGet, kXR_fattrList, kXR_fattrSet, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_open_wrto, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientFattrRequest::numattr, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, ClientChkPointRequest::opcode, ClientFattrRequest::options, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientFattrRequest::subcode, and XrdProto::write_list::wlen.

Referenced by GenerateDescription(), and SetDescription().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL & url,
AnyObject & channelData )
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1841 of file XrdClXRootDTransport.cc.

1843 {
1844 XRootDChannelInfo *info = 0;
1845 channelData.Get( info );
1846 if( !bool( info->bindSelector ) )
1847 return url;
1848
1849 return URL( info->bindSelector->Get() );
1850 }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

+ Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message & message,
Socket * socket )
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 347 of file XrdClXRootDTransport.cc.

348 {
349 //--------------------------------------------------------------------------
350 // Retrieve the body
351 //--------------------------------------------------------------------------
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
354 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
355 bodySize = rsphdr->dlen;
356
357 if( message.GetSize() < bodySize + 8 )
358 message.ReAllocate( bodySize + 8 );
359
360 leftToBeRead = bodySize-(message.GetCursor()-8);
361 while( leftToBeRead )
362 {
363 int bytesRead = 0;
364 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365
366 if( !status.IsOK() || status.code == suRetry )
367 return status;
368
369 leftToBeRead -= bytesRead;
370 message.AdvanceCursor( bytesRead );
371 }
372
373 return XRootDStatus( stOK, suDone );
374 }
const uint16_t suRetry
const uint16_t stOK
Everything went OK.
const uint16_t suDone

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

+ Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message & message,
Socket * socket )
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 307 of file XrdClXRootDTransport.cc.

308 {
309 //--------------------------------------------------------------------------
310 // A new message - allocate the space needed for the header
311 //--------------------------------------------------------------------------
312 if( message.GetCursor() == 0 && message.GetSize() < 8 )
313 message.Allocate( 8 );
314
315 //--------------------------------------------------------------------------
316 // Read the message header
317 //--------------------------------------------------------------------------
318 if( message.GetCursor() < 8 )
319 {
320 size_t leftToBeRead = 8 - message.GetCursor();
321 while( leftToBeRead )
322 {
323 int bytesRead = 0;
324 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325 leftToBeRead, bytesRead );
326 if( !status.IsOK() || status.code == suRetry )
327 return status;
328
329 leftToBeRead -= bytesRead;
330 message.AdvanceCursor( bytesRead );
331 }
332 UnMarshallHeader( message );
333
334 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335 Log *log = DefaultEnv::GetLog();
336 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337 "body", &message, bodySize );
338
339 return XRootDStatus( stOK, suDone );
340 }
341 return XRootDStatus( stError, errInternal );
342 }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint64_t XRootDTransportMsg
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message & message,
Socket * socket )
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 379 of file XrdClXRootDTransport.cc.

380 {
381 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
382 if( rsphdr->status != kXR_status )
383 return XRootDStatus( stError, errInvalidOp );
384
385 //--------------------------------------------------------------------------
386 // In case of non kXR_status responses we read all the response, including
387 // data. For kXR_status responses we first read only the remainder of the
388 // header. The header must then be unmarshalled, and then a second call to
389 // GetMore (repeated for suRetry as needed) will read the data.
390 //--------------------------------------------------------------------------
391
392 uint32_t bodySize = rsphdr->dlen;
393 if( bodySize+8 < sizeof( ServerResponseStatus ) )
394 return XRootDStatus( stError, errInvalidMessage, 0,
395 "kXR_status: invalid message size." );
396
397 ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
398 bodySize += rspst->bdy.dlen;
399
400 if( message.GetSize() < bodySize + 8 )
401 message.ReAllocate( bodySize + 8 );
402
403 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404 while( leftToBeRead )
405 {
406 int bytesRead = 0;
407 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408
409 if( !status.IsOK() || status.code == suRetry )
410 return status;
411
412 leftToBeRead -= bytesRead;
413 message.AdvanceCursor( bytesRead );
414 }
415
416 // Unmarchal to message body
417 Log *log = DefaultEnv::GetLog();
418 XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
419 if( !st.IsOK() && st.code == errDataError )
420 {
421 log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422 st.GetErrorMessage().c_str() );
423 return st;
424 }
425
426 if( !st.IsOK() )
427 {
428 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429 &message );
430 return st;
431 }
432
433 return XRootDStatus( stOK, suDone );
434 }
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidOp
const uint16_t errInvalidMessage

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
AnyObject & channelData )
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1703 of file XrdClXRootDTransport.cc.

1704 {
1705 XRootDChannelInfo *info = 0;
1706 channelData.Get( info );
1707 return GetSignature( toSign, sign, info );
1708 }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get(), and GetSignature().

Referenced by GetSignature().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
XRootDChannelInfo * info )
virtual

Get signature for given message.

Definition at line 1713 of file XrdClXRootDTransport.cc.

1716 {
1717 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1718 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1719
1720 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1721 if( !info ) return Status( stError, errInternal );
1722 if( info->protection )
1723 {
1724 SecurityRequest *newreq = 0;
1725 // check if we have to secure the request in the first place
1726 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1727 // secure (sign/encrypt) the request
1728 int rc = info->protection->Secure( newreq, *thereq, 0 );
1729 // there was an error
1730 if( rc < 0 )
1731 return Status( stError, errInternal, -rc );
1732
1733 sign = new Message();
1734 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1735 }
1736
1737 return Status();
1738 }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), XrdCl::PluginUnloadHandler::lock, NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), XrdCl::stError, and XrdCl::PluginUnloadHandler::unloaded.

+ Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 467 of file XrdClXRootDTransport.cc.

469 {
470 XRootDChannelInfo *info = 0;
471 channelData.Get( info );
472 XrdSysMutexHelper scopedLock( info->mutex );
473
474 if( info->stream.size() <= handShakeData->subStreamId )
475 {
476 Log *log = DefaultEnv::GetLog();
477 log->Error( XRootDTransportMsg,
478 "[%s] Internal error: not enough substreams",
479 handShakeData->streamName.c_str() );
480 return XRootDStatus( stFatal, errInternal );
481 }
482
483 if( handShakeData->subStreamId == 0 )
484 {
485 info->streamName = handShakeData->streamName;
486 return HandShakeMain( handShakeData, channelData );
487 }
488 return HandShakeParallel( handShakeData, channelData );
489 }
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

Implements XrdCl::TransportHandler.

Definition at line 727 of file XrdClXRootDTransport.cc.

729 {
730 XRootDChannelInfo *info = 0;
731 channelData.Get( info );
732 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733 return ( sInfo.status == XRootDStreamInfo::Connected );
734 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL & url,
AnyObject & channelData )
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 439 of file XrdClXRootDTransport.cc.

441 {
442 XRootDChannelInfo *info = new XRootDChannelInfo( url );
443 XrdSysMutexHelper scopedLock( info->mutex );
444 channelData.Set( info );
445
446 Env *env = DefaultEnv::GetEnv();
447 int streams = DefaultSubStreamsPerChannel;
448 env->GetInt( "SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->stream.resize( streams );
451 info->strmSelector.reset( new StreamSelector( streams ) );
452 info->encrypted = url.IsSecure();
453 info->istpc = url.IsTPC();
454 info->logintoken = url.GetLoginToken();
455 }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t inactiveTime,
AnyObject & channelData )
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 785 of file XrdClXRootDTransport.cc.

787 {
788 XRootDChannelInfo *info = 0;
789 channelData.Get( info );
790 Env *env = DefaultEnv::GetEnv();
791 Log *log = DefaultEnv::GetLog();
792
793 int streamTimeout = DefaultStreamTimeout;
794 env->GetInt( "StreamTimeout", streamTimeout );
795
796 XrdSysMutexHelper scopedLock( info->mutex );
797
798 const time_t now = time(0);
799 const bool anySID =
800 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801
802 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
803 "stream timeout: %d, any SID: %d, wait barrier: %s",
804 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
805 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806
807 if( inactiveTime < streamTimeout )
808 return Status();
809
810 if( now < info->waitBarrier )
811 return Status();
812
813 if( !anySID )
814 return Status();
815
816 return Status( stError, errSocketTimeout );
817 }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
const uint16_t errSocketTimeout
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t time,
AnyObject & channelData )
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 739 of file XrdClXRootDTransport.cc.

741 {
742 XRootDChannelInfo *info = 0;
743 channelData.Get( info );
744 Env *env = DefaultEnv::GetEnv();
745 Log *log = DefaultEnv::GetLog();
746
747 //--------------------------------------------------------------------------
748 // Check the TTL settings for the current server
749 //--------------------------------------------------------------------------
750 int ttl;
751 if( info->serverFlags & kXR_isServer )
752 {
754 env->GetInt( "DataServerTTL", ttl );
755 }
756 else
757 {
759 env->GetInt( "LoadBalancerTTL", ttl );
760 }
761
762 //--------------------------------------------------------------------------
763 // See whether we can give a go-ahead for the disconnection
764 //--------------------------------------------------------------------------
765 XrdSysMutexHelper scopedLock( info->mutex );
766 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
768 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
770 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771
772 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773 return false;
774
775 if( !allocatedSIDs && inactiveTime > ttl )
776 return true;
777
778 return false;
779 }
#define kXR_isServer
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message & msg)
static

Log server error response.

Definition at line 1454 of file XrdClXRootDTransport.cc.

1455 {
1456 Log *log = DefaultEnv::GetLog();
1457 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461 rsp->body.error.errnum, errmsg );
1462 delete [] errmsg;
1463 }
union ServerResponse::@0 body
ServerResponseHeader hdr

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char * msg)
static

Marshal the outgoing message.

Definition at line 1050 of file XrdClXRootDTransport.cc.

1051 {
1052 ClientRequest *req = (ClientRequest*)msg;
1053 switch( req->header.requestid )
1054 {
1055 //------------------------------------------------------------------------
1056 // kXR_protocol
1057 //------------------------------------------------------------------------
1058 case kXR_protocol:
1059 req->protocol.clientpv = htonl( req->protocol.clientpv );
1060 break;
1061
1062 //------------------------------------------------------------------------
1063 // kXR_login
1064 //------------------------------------------------------------------------
1065 case kXR_login:
1066 req->login.pid = htonl( req->login.pid );
1067 break;
1068
1069 //------------------------------------------------------------------------
1070 // kXR_locate
1071 //------------------------------------------------------------------------
1072 case kXR_locate:
1073 req->locate.options = htons( req->locate.options );
1074 break;
1075
1076 //------------------------------------------------------------------------
1077 // kXR_query
1078 //------------------------------------------------------------------------
1079 case kXR_query:
1080 req->query.infotype = htons( req->query.infotype );
1081 break;
1082
1083 //------------------------------------------------------------------------
1084 // kXR_truncate
1085 //------------------------------------------------------------------------
1086 case kXR_truncate:
1087 req->truncate.offset = htonll( req->truncate.offset );
1088 break;
1089
1090 //------------------------------------------------------------------------
1091 // kXR_mkdir
1092 //------------------------------------------------------------------------
1093 case kXR_mkdir:
1094 req->mkdir.mode = htons( req->mkdir.mode );
1095 break;
1096
1097 //------------------------------------------------------------------------
1098 // kXR_chmod
1099 //------------------------------------------------------------------------
1100 case kXR_chmod:
1101 req->chmod.mode = htons( req->chmod.mode );
1102 break;
1103
1104 //------------------------------------------------------------------------
1105 // kXR_open
1106 //------------------------------------------------------------------------
1107 case kXR_open:
1108 req->open.mode = htons( req->open.mode );
1109 req->open.options = htons( req->open.options );
1110 break;
1111
1112 //------------------------------------------------------------------------
1113 // kXR_read
1114 //------------------------------------------------------------------------
1115 case kXR_read:
1116 req->read.offset = htonll( req->read.offset );
1117 req->read.rlen = htonl( req->read.rlen );
1118 break;
1119
1120 //------------------------------------------------------------------------
1121 // kXR_write
1122 //------------------------------------------------------------------------
1123 case kXR_write:
1124 req->write.offset = htonll( req->write.offset );
1125 break;
1126
1127 //------------------------------------------------------------------------
1128 // kXR_mv
1129 //------------------------------------------------------------------------
1130 case kXR_mv:
1131 req->mv.arg1len = htons( req->mv.arg1len );
1132 break;
1133
1134 //------------------------------------------------------------------------
1135 // kXR_readv
1136 //------------------------------------------------------------------------
1137 case kXR_readv:
1138 {
1139 uint16_t numChunks = (req->readv.dlen)/16;
1140 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141 for( size_t i = 0; i < numChunks; ++i )
1142 {
1143 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144 dataChunk[i].offset = htonll( dataChunk[i].offset );
1145 }
1146 break;
1147 }
1148
1149 //------------------------------------------------------------------------
1150 // kXR_writev
1151 //------------------------------------------------------------------------
1152 case kXR_writev:
1153 {
1154 uint16_t numChunks = (req->writev.dlen)/16;
1155 XrdProto::write_list *wrtList =
1156 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157 for( size_t i = 0; i < numChunks; ++i )
1158 {
1159 wrtList[i].wlen = htonl( wrtList[i].wlen );
1160 wrtList[i].offset = htonll( wrtList[i].offset );
1161 }
1162
1163 break;
1164 }
1165
1166 case kXR_pgread:
1167 {
1168 req->pgread.offset = htonll( req->pgread.offset );
1169 req->pgread.rlen = htonl( req->pgread.rlen );
1170 break;
1171 }
1172
1173 case kXR_pgwrite:
1174 {
1175 req->pgwrite.offset = htonll( req->pgwrite.offset );
1176 break;
1177 }
1178
1179 //------------------------------------------------------------------------
1180 // kXR_prepare
1181 //------------------------------------------------------------------------
1182 case kXR_prepare:
1183 {
1184 req->prepare.optionX = htons( req->prepare.optionX );
1185 req->prepare.port = htons( req->prepare.port );
1186 break;
1187 }
1188
1189 case kXR_chkpoint:
1190 {
1191 if( req->chkpoint.opcode == kXR_ckpXeq )
1192 MarshallRequest( msg + 24 );
1193 break;
1194 }
1195 };
1196
1197 req->header.requestid = htons( req->header.requestid );
1198 req->header.dlen = htonl( req->header.dlen );
1199 return XRootDStatus();
1200 }
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:858
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
struct ClientReadVRequest readv
Definition XProtocol.hh:868
struct ClientOpenRequest open
Definition XProtocol.hh:860
struct ClientRequestHdr header
Definition XProtocol.hh:846
struct ClientWriteVRequest writev
Definition XProtocol.hh:877
struct ClientLoginRequest login
Definition XProtocol.hh:857
@ kXR_login
Definition XProtocol.hh:119
struct ClientChmodRequest chmod
Definition XProtocol.hh:850
struct ClientQueryRequest query
Definition XProtocol.hh:866
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientMvRequest mv
Definition XProtocol.hh:859
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ClientPrepareRequest prepare
Definition XProtocol.hh:864
struct ClientWriteRequest write
Definition XProtocol.hh:876
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
struct ClientLocateRequest locate
Definition XProtocol.hh:856
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequestHdr::dlen, ClientReadVRequest::dlen, ClientWriteVRequest::dlen, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, readahead_list::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

+ Here is the call graph for this function:

◆ MarshallRequest() [2/2]

static XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message * msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176 {
177 MarshallRequest( msg->GetBuffer() );
178 msg->SetIsMarshalled( true );
179 return XRootDStatus();
180 }

References XrdCl::Buffer::GetBuffer(), MarshallRequest(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message & msg,
uint16_t subStream,
AnyObject & channelData )
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1564 {
1565 XRootDChannelInfo *info = 0;
1566 channelData.Get( info );
1567 XrdSysMutexHelper scopedLock( info->mutex );
1568 Log *log = DefaultEnv::GetLog();
1569
1570 //--------------------------------------------------------------------------
1571 // Update the substream queues
1572 //--------------------------------------------------------------------------
1573 info->strmSelector->MsgReceived( subStream );
1574
1575 //--------------------------------------------------------------------------
1576 // Check whether this message is a response to a request that has
1577 // timed out, and if so, drop it
1578 //--------------------------------------------------------------------------
1579 ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1580 if( rsp->hdr.status == kXR_attn )
1581 {
1582 return NoAction;
1583 }
1584
1585 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586 {
1587 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588 "response that we're no longer interested in (timed out)",
1589 &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590 //------------------------------------------------------------------------
1591 // If it is kXR_waitresp there will be another one,
1592 // so we don't release the sid yet
1593 //------------------------------------------------------------------------
1594 if( rsp->hdr.status != kXR_waitresp )
1595 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596 //------------------------------------------------------------------------
1597 // If it is a successful response to an open request
1598 // that timed out, we need to send a close
1599 //------------------------------------------------------------------------
1600 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602 if( sidIt != info->sentOpens.end() )
1603 {
1604 info->sentOpens.erase( sidIt );
1605 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606 }
1607 return DigestMsg;
1608 }
1609
1610 //--------------------------------------------------------------------------
1611 // If we have a wait or waitresp
1612 //--------------------------------------------------------------------------
1613 uint32_t seconds = 0;
1614 if( rsp->hdr.status == kXR_wait )
1615 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616 // to re-send the request
1617 else if( rsp->hdr.status == kXR_waitresp )
1618 {
1619 seconds = ntohl( rsp->body.waitresp.seconds );
1620
1621 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622 "setting up wait barrier.",
1623 info->streamName.c_str(),
1624 seconds );
1625 }
1626
1627 time_t barrier = time(0) + seconds;
1628 if( info->waitBarrier < barrier )
1629 info->waitBarrier = barrier;
1630
1631 //--------------------------------------------------------------------------
1632 // If we got a response to an open request, we may need to bump the counter
1633 // of open files
1634 //--------------------------------------------------------------------------
1635 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637 if( sidIt != info->sentOpens.end() )
1638 {
1639 if( rsp->hdr.status == kXR_waitresp )
1640 return NoAction;
1641 info->sentOpens.erase( sidIt );
1642 if( rsp->hdr.status == kXR_ok )
1643 {
1644 ++info->openFiles;
1645 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646 }
1647 return NoAction;
1648 }
1649
1650 //--------------------------------------------------------------------------
1651 // If we got a response to a close, we may need to decrement the counter of
1652 // open files
1653 //--------------------------------------------------------------------------
1654 sidIt = info->sentCloses.find( sid );
1655 if( sidIt != info->sentCloses.end() )
1656 {
1657 if( rsp->hdr.status == kXR_waitresp )
1658 return NoAction;
1659 info->sentCloses.erase( sidIt );
1660 --info->openFiles;
1661 return NoAction;
1662 }
1663 return NoAction;
1664 }
kXR_char streamid[2]
Definition XProtocol.hh:914
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message * msg,
uint16_t subStream,
uint32_t bytesSent,
AnyObject & channelData )
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1669 of file XrdClXRootDTransport.cc.

1673 {
1674 // Called when a message has been sent. For messages that return on a
1675 // different pathid (and hence may use a different poller) it is possible
1676 // that the server has already replied and the reply will trigger
1677 // MessageReceived() before this method has been called. However for open
1678 // and close this is never the case and this method is used for tracking
1679 // only those.
1680 XRootDChannelInfo *info = 0;
1681 channelData.Get( info );
1682 XrdSysMutexHelper scopedLock( info->mutex );
1683 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1684 uint16_t reqid = ntohs( req->header.requestid );
1685
1686
1687 //--------------------------------------------------------------------------
1688 // We need to track opens to know if we can close streams due to idleness
1689 //--------------------------------------------------------------------------
1690 uint16_t sid;
1691 memcpy( &sid, req->header.streamid, 2 );
1692
1693 if( reqid == kXR_open )
1694 info->sentOpens.insert( sid );
1695 else if( reqid == kXR_close )
1696 info->sentCloses.insert( sid );
1697 }
kXR_char streamid[2]
Definition XProtocol.hh:156

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 822 of file XrdClXRootDTransport.cc.

823 {
824 return PathID( 0, 0 );
825 }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 830 of file XrdClXRootDTransport.cc.

833 {
834 XRootDChannelInfo *info = 0;
835 channelData.Get( info );
836 XrdSysMutexHelper scopedLock( info->mutex );
837
838 //--------------------------------------------------------------------------
839 // If we're not connected to a data server or we don't know that yet
840 // we stream through 0
841 //--------------------------------------------------------------------------
842 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843 return PathID( 0, 0 );
844
845 //--------------------------------------------------------------------------
846 // Select the streams
847 //--------------------------------------------------------------------------
848 Log *log = DefaultEnv::GetLog();
849 uint16_t upStream = 0;
850 uint16_t downStream = 0;
851
852 if( hint )
853 {
854 upStream = hint->up;
855 downStream = hint->down;
856 }
857 else
858 {
859 upStream = 0;
860 std::vector<bool> connected;
861 connected.reserve( info->stream.size() - 1 );
862 size_t nbConnected = 0;
863 for( size_t i = 1; i < info->stream.size(); ++i )
864 if( info->stream[i].status == XRootDStreamInfo::Connected )
865 {
866 connected.push_back( true );
867 ++nbConnected;
868 }
869 else
870 connected.push_back( false );
871
872 if( nbConnected == 0 )
873 downStream = 0;
874 else
875 downStream = info->strmSelector->Select( connected );
876 }
877
878 if( upStream >= info->stream.size() )
879 {
880 log->Debug( XRootDTransportMsg,
881 "[%s] Up link stream %d does not exist, using 0",
882 info->streamName.c_str(), upStream );
883 upStream = 0;
884 }
885
886 if( downStream >= info->stream.size() )
887 {
888 log->Debug( XRootDTransportMsg,
889 "[%s] Down link stream %d does not exist, using 0",
890 info->streamName.c_str(), downStream );
891 downStream = 0;
892 }
893
894 //--------------------------------------------------------------------------
895 // Modify the message
896 //--------------------------------------------------------------------------
897 UnMarshallRequest( msg );
898 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
899 switch( hdr->requestid )
900 {
901 //------------------------------------------------------------------------
902 // Read - we update the path id to tell the server where we want to
903 // get the response, but we still send the request through stream 0
904 // We need to allocate space for read_args if we don't have it
905 // included yet
906 //------------------------------------------------------------------------
907 case kXR_read:
908 {
909 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910 {
911 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913 memset( newBuf, 0, 8 );
914 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
915 req->dlen += 8;
916 }
917 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918 args->pathid = info->stream[downStream].pathId;
919 break;
920 }
921
922
923 //------------------------------------------------------------------------
924 // PgRead - we update the path id to tell the server where we want to
925 // get the response, but we still send the request through stream 0
926 // We need to allocate space for ClientPgReadReqArgs if we don't have it
927 // included yet
928 //------------------------------------------------------------------------
929 case kXR_pgread:
930 {
931 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932 {
933 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
936 ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
937 req->dlen += sizeof( ClientPgReadReqArgs );
938 }
939 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941 args->pathid = info->stream[downStream].pathId;
942 break;
943 }
944
945 //------------------------------------------------------------------------
946 // ReadV - the situation is identical to read but we don't need any
947 // additional structures to specify the return path
948 //------------------------------------------------------------------------
949 case kXR_readv:
950 {
951 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
952 req->pathid = info->stream[downStream].pathId;
953 break;
954 }
955
956 //------------------------------------------------------------------------
957 // Write - multiplexing writes doesn't work properly in the server
958 //------------------------------------------------------------------------
959 case kXR_write:
960 {
961// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962// req->pathid = info->stream[downStream].pathId;
963 break;
964 }
965
966 //------------------------------------------------------------------------
967 // WriteV - multiplexing writes doesn't work properly in the server
968 //------------------------------------------------------------------------
969 case kXR_writev:
970 {
971// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972// req->pathid = info->stream[downStream].pathId;
973 break;
974 }
975
976 //------------------------------------------------------------------------
977 // PgWrite - multiplexing writes doesn't work properly in the server
978 //------------------------------------------------------------------------
979 case kXR_pgwrite:
980 {
981// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982// req->pathid = info->stream[downStream].pathId;
983 break;
984 }
985 };
986 MarshallRequest( msg );
987 return PathID( upStream, downStream );
988 }
kXR_char pathid
Definition XProtocol.hh:653
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, read_args::pathid, ClientReadVRequest::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

+ Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject & channelData)
static

Number of currently connected data streams.

Definition at line 1468 of file XrdClXRootDTransport.cc.

1469 {
1470 XRootDChannelInfo *info = 0;
1471 channelData.Get( info );
1472 XrdSysMutexHelper scopedLock( info->mutex );
1473
1474 uint16_t nbConnected = 0;
1475 for( size_t i = 1; i < info->stream.size(); ++i )
1476 if( info->stream[i].status == XRootDStreamInfo::Connected )
1477 ++nbConnected;
1478
1479 return nbConnected;
1480 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::AnyObject::Get(), XrdCl::XRootDChannelInfo::mutex, and XrdCl::XRootDChannelInfo::stream.

Referenced by XrdCl::Channel::NbConnectedStrm().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168 {
169 return true;
170 }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1763 of file XrdClXRootDTransport.cc.

1765 {
1766 XRootDChannelInfo *info = 0;
1767 channelData.Get( info );
1768
1770 int notlsok = DefaultNoTlsOK;
1771 env->GetInt( "NoTlsOK", notlsok );
1772
1773 if( notlsok )
1774 return info->encrypted;
1775
1776 // Did the server instructed us to switch to TLS right away?
1777 if( info->serverFlags & kXR_gotoTLS )
1778 {
1779 info->encrypted = true;
1780 return true ;
1781 }
1782
1783 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1784
1785 //--------------------------------------------------------------------------
1786 // The control stream (sub-stream 0) might need to switch to TLS before
1787 // login or after login
1788 //--------------------------------------------------------------------------
1789 if( handShakeData->subStreamId == 0 )
1790 {
1791 //------------------------------------------------------------------------
1792 // We are about to login and the server asked to start encrypting
1793 // before login
1794 //------------------------------------------------------------------------
1795 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1796 ( info->serverFlags & kXR_tlsLogin ) )
1797 {
1798 info->encrypted = true;
1799 return true;
1800 }
1801
1802 //--------------------------------------------------------------------
1803 // The hand-shake is done and the server requested to encrypt the session
1804 //--------------------------------------------------------------------
1805 if( (sInfo.status == XRootDStreamInfo::Connected ||
1806 //--------------------------------------------------------------------
1807 // we really need to turn on TLS before we sent kXR_endsess and we
1808 // are about to do so (1st enable encryption, then send kXR_endsess)
1809 //--------------------------------------------------------------------
1810 sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1811 ( info->serverFlags & kXR_tlsSess ) )
1812 {
1813 info->encrypted = true;
1814 return true;
1815 }
1816 }
1817 //--------------------------------------------------------------------------
1818 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1819 // bind.
1820 //--------------------------------------------------------------------------
1821 else
1822 {
1823 //------------------------------------------------------------------------
1824 // We are about to bind a data stream and the server asked to start
1825 // encrypting before bind
1826 //------------------------------------------------------------------------
1827 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1828 ( info->serverFlags & kXR_tlsData ) )
1829 {
1830 info->encrypted = true;
1831 return true;
1832 }
1833 }
1834
1835 return false;
1836 }
#define kXR_tlsLogin
#define kXR_gotoTLS
#define kXR_tlsSess
#define kXR_tlsData
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

+ Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t query,
AnyObject & result,
AnyObject & channelData )
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1513 of file XrdClXRootDTransport.cc.

1516 {
1517 XRootDChannelInfo *info = 0;
1518 channelData.Get( info );
1519 XrdSysMutexHelper scopedLock( info->mutex );
1520
1521 switch( query )
1522 {
1523 //------------------------------------------------------------------------
1524 // Protocol name
1525 //------------------------------------------------------------------------
1527 result.Set( (const char*)"XRootD", false );
1528 return Status();
1529
1530 //------------------------------------------------------------------------
1531 // Authentication
1532 //------------------------------------------------------------------------
1534 result.Set( new std::string( info->authProtocolName ), false );
1535 return Status();
1536
1537 //------------------------------------------------------------------------
1538 // Server flags
1539 //------------------------------------------------------------------------
1541 result.Set( new int( info->serverFlags ), false );
1542 return Status();
1543
1544 //------------------------------------------------------------------------
1545 // Protocol version
1546 //------------------------------------------------------------------------
1548 result.Set( new int( info->protocolVersion ), false );
1549 return Status();
1550
1552 result.Set( new bool( info->encrypted ), false );
1553 return Status();
1554 };
1555 return Status( stError, errQueryNotSupported );
1556 }
const uint16_t errQueryNotSupported
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ SetDescription()

static void XrdCl::XRootDTransport::SetDescription ( Message * msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246 {
247 std::ostringstream o;
248 GenerateDescription( msg->GetBuffer(), o );
249 msg->SetDescription( o.str() );
250 }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::Open(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileSystem::Stat(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject & channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 995 of file XrdClXRootDTransport.cc.

996 {
997 XRootDChannelInfo *info = 0;
998 channelData.Get( info );
999 XrdSysMutexHelper scopedLock( info->mutex );
1000
1001 //--------------------------------------------------------------------------
1002 // If the connection has been opened in order to orchestrate a TPC or
1003 // the remote server is a Manager or Metamanager we will need only one
1004 // (control) stream.
1005 //--------------------------------------------------------------------------
1006 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007
1008 //--------------------------------------------------------------------------
1009 // Number of streams requested by user
1010 //--------------------------------------------------------------------------
1011 uint16_t ret = info->stream.size();
1012
1014 int nodata = DefaultTlsNoData;
1015 env->GetInt( "TlsNoData", nodata );
1016
1017 // Does the server require the stream 0 to be encrypted?
1018 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019 ( info->serverFlags & kXR_tlsLogin ) ||
1020 ( info->serverFlags & kXR_tlsSess );
1021 // Does the server NOT require the data streams to be encrypted?
1022 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023 // Does the user require the stream 0 to be encrypted?
1024 bool usrTlsStrm0 = info->encrypted;
1025 // Does the user NOT require the data streams to be encrypted?
1026 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027
1028 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030 {
1031 //------------------------------------------------------------------------
1032 // The server or user asked us to encrypt stream 0, but to send the data
1033 // (read/write) using a plain TCP connection
1034 //------------------------------------------------------------------------
1035 if( ret == 1 ) ++ret;
1036 }
1037
1038 if( ret > info->stream.size() )
1039 {
1040 info->stream.resize( ret );
1041 info->strmSelector->AdjustQueues( ret );
1042 }
1043
1044 return ret;
1045 }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

+ Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message & msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1381 of file XrdClXRootDTransport.cc.

1382 {
1383 ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1384 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385
1386 switch( reqType )
1387 {
1388 case kXR_pgwrite:
1389 {
1390 //--------------------------------------------------------------------------
1391 // If there's no additional data there's nothing to unmarshal
1392 //--------------------------------------------------------------------------
1393 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394 //--------------------------------------------------------------------------
1395 // If there's not enough data to form correction-segment report an error
1396 //--------------------------------------------------------------------------
1397 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1398 return XRootDStatus( stError, errInvalidMessage, 0,
1399 "kXR_status: invalid message size." );
1400
1401 //--------------------------------------------------------------------------
1402 // Calculate the crc32c for the additional data
1403 //--------------------------------------------------------------------------
1405 cse->cseCRC = ntohl( cse->cseCRC );
1406 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409
1410 //--------------------------------------------------------------------------
1411 // Do the integrity checks
1412 //--------------------------------------------------------------------------
1413 if( crcval != cse->cseCRC )
1414 {
1415 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416 "corrupted (crc32c integrity check failed)." );
1417 }
1418
1419 cse->dlFirst = ntohs( cse->dlFirst );
1420 cse->dlLast = ntohs( cse->dlLast );
1421
1422 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423 sizeof( kXR_int64 );
1424 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425 sizeof( ServerResponseBody_pgWrCSE ) );
1426
1427 for( size_t i = 0; i < pgcnt; ++i )
1428 pgoffs[i] = ntohll( pgoffs[i] );
1429
1430 return XRootDStatus();
1431 break;
1432 }
1433
1434 default:
1435 break;
1436 }
1437
1438 return XRootDStatus( stError, errNotSupported );
1439 }
ServerResponseStatus status
@ kXR_1stRequest
Definition XProtocol.hh:111
long long kXR_int64
Definition XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
const uint16_t errNotSupported

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message * msg,
uint16_t reqType )
static

Unmarshall the body of the incoming message.

Definition at line 1227 of file XrdClXRootDTransport.cc.

1228 {
1229 ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1230
1231 //--------------------------------------------------------------------------
1232 // kXR_ok
1233 //--------------------------------------------------------------------------
1234 if( m->hdr.status == kXR_ok )
1235 {
1236 switch( reqType )
1237 {
1238 //----------------------------------------------------------------------
1239 // kXR_protocol
1240 //----------------------------------------------------------------------
1241 case kXR_protocol:
1242 if( m->hdr.dlen < 8 )
1243 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246 break;
1247 }
1248 }
1249 //--------------------------------------------------------------------------
1250 // kXR_error
1251 //--------------------------------------------------------------------------
1252 else if( m->hdr.status == kXR_error )
1253 {
1254 if( m->hdr.dlen < 4 )
1255 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256 m->body.error.errnum = ntohl( m->body.error.errnum );
1257 }
1258
1259 //--------------------------------------------------------------------------
1260 // kXR_wait
1261 //--------------------------------------------------------------------------
1262 else if( m->hdr.status == kXR_wait )
1263 {
1264 if( m->hdr.dlen < 4 )
1265 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266 m->body.wait.seconds = htonl( m->body.wait.seconds );
1267 }
1268
1269 //--------------------------------------------------------------------------
1270 // kXR_redirect
1271 //--------------------------------------------------------------------------
1272 else if( m->hdr.status == kXR_redirect )
1273 {
1274 if( m->hdr.dlen < 4 )
1275 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276 m->body.redirect.port = htonl( m->body.redirect.port );
1277 }
1278
1279 //--------------------------------------------------------------------------
1280 // kXR_waitresp
1281 //--------------------------------------------------------------------------
1282 else if( m->hdr.status == kXR_waitresp )
1283 {
1284 if( m->hdr.dlen < 4 )
1285 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287 }
1288
1289 //--------------------------------------------------------------------------
1290 // kXR_attn
1291 //--------------------------------------------------------------------------
1292 else if( m->hdr.status == kXR_attn )
1293 {
1294 if( m->hdr.dlen < 4 )
1295 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296 m->body.attn.actnum = htonl( m->body.attn.actnum );
1297 }
1298
1299 return XRootDStatus();
1300 }
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_error
Definition XProtocol.hh:903

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message & msg)
static

Unmarshall the header incoming message.

Definition at line 1444 of file XrdClXRootDTransport.cc.

1445 {
1446 ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1447 header->status = ntohs( header->status );
1448 header->dlen = ntohl( header->dlen );
1449 }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message * msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1206 of file XrdClXRootDTransport.cc.

1207 {
1208 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209 // We rely on the marshaling process to be symmetric!
1210 // First we unmarshall the request ID and the length because
1211 // MarshallRequest() relies on these, and then we need to unmarshall these
1212 // two again, because they get marshalled in MarshallRequest().
1213 // All this is pretty damn ugly and should be rewritten.
1214 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215 req->header.requestid = htons( req->header.requestid );
1216 req->header.dlen = htonl( req->header.dlen );
1217 XRootDStatus st = MarshallRequest( msg );
1218 req->header.requestid = htons( req->header.requestid );
1219 req->header.dlen = htonl( req->header.dlen );
1220 msg->SetIsMarshalled( false );
1221 return st;
1222 }
const uint16_t suAlreadyDone

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message & msg,
uint16_t reqType )
static

Unmarshall the body of the status response.

Definition at line 1305 of file XrdClXRootDTransport.cc.

1306 {
1307 //--------------------------------------------------------------------------
1308 // Calculate the crc32c before the unmarshaling the body!
1309 //--------------------------------------------------------------------------
1310 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1311 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314
1315 size_t stlen = sizeof( ServerResponseStatus );
1316 switch( reqType )
1317 {
1318 case kXR_pgread:
1319 {
1320 stlen += sizeof( ServerResponseBody_pgRead );
1321 break;
1322 }
1323
1324 case kXR_pgwrite:
1325 {
1326 stlen += sizeof( ServerResponseBody_pgWrite );
1327 break;
1328 }
1329 }
1330
1331 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332 "kXR_status: invalid message size." );
1333
1334 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336
1337 switch( reqType )
1338 {
1339 case kXR_pgread:
1340 {
1341 ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1342 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343 break;
1344 }
1345
1346 case kXR_pgwrite:
1347 {
1348 ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1349 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350 break;
1351 }
1352 }
1353
1354 //--------------------------------------------------------------------------
1355 // Do the integrity checks
1356 //--------------------------------------------------------------------------
1357 if( crcval != rspst->bdy.crc32c )
1358 {
1359 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360 "corrupted (crc32c integrity check failed)." );
1361 }
1362
1363 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365 {
1366 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367 "(stream ID mismatch)." );
1368 }
1369
1370
1371
1372 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373 {
1374 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375 "(request ID mismatch)." );
1376 }
1377
1378 return XRootDStatus();
1379 }
struct ServerResponseHeader hdr

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseHeader::streamid, and ServerResponseBody_Status::streamID.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1754 of file XrdClXRootDTransport.cc.

1755 {
1756 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1757 pSecUnloadHandler->unloaded = true;
1758 }

References XrdCl::PluginUnloadHandler::lock, and XrdCl::PluginUnloadHandler::unloaded.

Friends And Related Symbol Documentation

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.


The documentation for this class was generated from the following files: