XRootD
Loading...
Searching...
No Matches
XrdCl::InQueue Class Reference

A synchronize queue for incoming data. More...

#include <XrdClInQueue.hh>

+ Collaboration diagram for XrdCl::InQueue:

Public Member Functions

void AddMessageHandler (MsgHandler *handler, bool &rmMsg)
 
void AssignTimeout (MsgHandler *handler)
 
MsgHandlerGetHandlerForMessage (std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
 
void ReAddMessageHandler (MsgHandler *handler, time_t expires)
 Re-insert the handler without scanning the cached messages.
 
void RemoveMessageHandler (MsgHandler *handler)
 Remove a listener.
 
void ReportStreamEvent (MsgHandler::StreamEvent event, XRootDStatus status)
 Report an event to the handlers.
 
void ReportTimeout (time_t now=0)
 Timeout handlers.
 

Detailed Description

A synchronize queue for incoming data.

Definition at line 36 of file XrdClInQueue.hh.

Member Function Documentation

◆ AddMessageHandler()

void XrdCl::InQueue::AddMessageHandler ( MsgHandler * handler,
bool & rmMsg )

Add a listener that should be notified about incoming messages. Freshly added handlers have no expire time set and will not trigger the timeout reporting. The expiry is added by AssignTimeout or GetHandlerForMessage.

Parameters
handlermessage handler
rmMsgwill be set to true if a left over message matching the request has been removed from the queue

Definition at line 54 of file XrdClInQueue.cc.

55 {
56 uint16_t handlerSid = handler->GetSid();
57 XrdSysMutexHelper scopedLock( pMutex );
58
59 pHandlers[handlerSid] = HandlerAndExpire( handler, 0 );
60 }

References XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::OnReadyToWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ AssignTimeout()

void XrdCl::InQueue::AssignTimeout ( MsgHandler * handler)

If the specified handler is in the queue but has not yet had an expiry time assigned, query the handler for the expiry and record it. Expiry will also be assigned by GetHandlerForMessage if not already assigned.

Parameters
handlerhandler to check

Definition at line 192 of file XrdClInQueue.cc.

193 {
194 uint16_t handlerSid = handler->GetSid();
195 XrdSysMutexHelper scopedLock( pMutex );
196 HandlerMap::iterator it = pHandlers.find( handlerSid );
197 if( it != pHandlers.end() )
198 {
199 if( it->second.second == 0 )
200 {
201 it->second.second = handler->GetExpiration();
202
203 Log *log = DefaultEnv::GetLog();
204 log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
205 handler, (long long)it->second.second );
206
207 }
208 }
209 }
static Log * GetLog()
Get default log.
const uint64_t ExDbgMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::OnMessageSent().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetHandlerForMessage()

MsgHandler * XrdCl::InQueue::GetHandlerForMessage ( std::shared_ptr< Message > & msg,
time_t & expires,
uint16_t & action )

Get a message handler interested in receiving message whose header is stored in msg

Parameters
msgmessage header
expireshandle's expiration timestamp
actionthe action declared by the handler
Returns
handler or 0 if none is interested

Definition at line 66 of file XrdClInQueue.cc.

69 {
70 time_t exp = 0;
71 uint16_t act = 0;
72 uint16_t msgSid = 0;
73 MsgHandler* handler = 0;
74
75 if (DiscardMessage(*msg, msgSid))
76 {
77 return handler;
78 }
79
80 XrdSysMutexHelper scopedLock( pMutex );
81 HandlerMap::iterator it = pHandlers.find(msgSid);
82
83 if (it != pHandlers.end())
84 {
85 Log *log = DefaultEnv::GetLog();
86 handler = it->second.first;
87 act = handler->Examine( msg );
88 if( it->second.second == 0 ) {
89 it->second.second = handler->GetExpiration();
90 log->Debug( ExDbgMsg, "[handler: %p] Assigned expiration %lld.",
91 handler, (long long)it->second.second );
92 }
93 exp = it->second.second;
94 log->Debug( ExDbgMsg, "[msg: %p] Assigned MsgHandler: %p.",
95 msg.get(), handler );
96
97
99 {
100 pHandlers.erase( it );
101 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
102 handler, handler );
103 }
104 }
105
106 if( handler )
107 {
108 expires = exp;
109 action = act;
110 }
111
112 return handler;
113 }

References XrdCl::Log::Debug(), XrdCl::MsgHandler::Examine(), XrdCl::ExDbgMsg, XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::RemoveHandler.

Referenced by XrdCl::Stream::InstallIncHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReAddMessageHandler()

void XrdCl::InQueue::ReAddMessageHandler ( MsgHandler * handler,
time_t expires )

Re-insert the handler without scanning the cached messages.

Definition at line 118 of file XrdClInQueue.cc.

120 {
121 uint16_t handlerSid = handler->GetSid();
122 XrdSysMutexHelper scopedLock( pMutex );
123 pHandlers[handlerSid] = HandlerAndExpire( handler, expires );
124 }

References XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::ForceError(), and XrdCl::Stream::OnError().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveMessageHandler()

void XrdCl::InQueue::RemoveMessageHandler ( MsgHandler * handler)

Remove a listener.

Definition at line 129 of file XrdClInQueue.cc.

130 {
131 uint16_t handlerSid = handler->GetSid();
132 XrdSysMutexHelper scopedLock( pMutex );
133 pHandlers.erase(handlerSid);
134 Log *log = DefaultEnv::GetLog();
135 log->Debug( ExDbgMsg, "[handler: %p] Removed MsgHandler: %p from the in-queue.",
136 handler, handler );
137
138 }

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::DefaultEnv::GetLog(), and XrdCl::MsgHandler::GetSid().

Referenced by XrdCl::Stream::ForceError(), XrdCl::Stream::InspectStatusRsp(), and XrdCl::Stream::OnError().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReportStreamEvent()

void XrdCl::InQueue::ReportStreamEvent ( MsgHandler::StreamEvent event,
XRootDStatus status )

Report an event to the handlers.

Definition at line 143 of file XrdClInQueue.cc.

145 {
146 uint8_t action = 0;
147 XrdSysMutexHelper scopedLock( pMutex );
148 for( HandlerMap::iterator it = pHandlers.begin(); it != pHandlers.end(); )
149 {
150 action = it->second.first->OnStreamEvent( event, status );
151
152 if( action & MsgHandler::RemoveHandler )
153 {
154 auto next = it; ++next;
155 pHandlers.erase( it );
156 it = next;
157 }
158 else
159 ++it;
160 }
161 }

References XrdCl::MsgHandler::RemoveHandler.

Referenced by XrdCl::Stream::ForceError(), and XrdCl::Stream::OnError().

+ Here is the caller graph for this function:

◆ ReportTimeout()

void XrdCl::InQueue::ReportTimeout ( time_t now = 0)

Timeout handlers.

Definition at line 166 of file XrdClInQueue.cc.

167 {
168 if( !now )
169 now = ::time(0);
170
171 XrdSysMutexHelper scopedLock( pMutex );
172 HandlerMap::iterator it = pHandlers.begin();
173 while( it != pHandlers.end() )
174 {
175 if( it->second.second && it->second.second <= now )
176 {
177 uint8_t act = it->second.first->OnStreamEvent( MsgHandler::Timeout,
178 Status( stError, errOperationExpired ) );
179 auto next = it; ++next;
180 if( act & MsgHandler::RemoveHandler )
181 pHandlers.erase( it );
182 it = next;
183 }
184 else
185 ++it;
186 }
187 }
@ Timeout
The declared timeout has occurred.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errOperationExpired, XrdCl::MsgHandler::RemoveHandler, XrdCl::stError, and XrdCl::MsgHandler::Timeout.

Referenced by XrdCl::Stream::Tick().

+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: