XRootD
Loading...
Searching...
No Matches
XrdFrmXfrQueue Class Reference

#include <XrdFrmXfrQueue.hh>

+ Collaboration diagram for XrdFrmXfrQueue:

Public Member Functions

 XrdFrmXfrQueue ()
 
 ~XrdFrmXfrQueue ()
 

Static Public Member Functions

static int Add (XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
 
static void Done (XrdFrmXfrJob *xP, const char *Msg)
 
static XrdFrmXfrJobGet (int ioQType)
 
static int Init ()
 
static void StopMon (void *parg)
 

Static Public Attributes

static const int useAnyQ = 0
 
static const int useInpQ = 1
 
static const int useOutQ = -1
 

Detailed Description

Definition at line 41 of file XrdFrmXfrQueue.hh.

Constructor & Destructor Documentation

◆ XrdFrmXfrQueue()

XrdFrmXfrQueue::XrdFrmXfrQueue ( )
inline

Definition at line 59 of file XrdFrmXfrQueue.hh.

59{}

◆ ~XrdFrmXfrQueue()

XrdFrmXfrQueue::~XrdFrmXfrQueue ( )
inline

Definition at line 60 of file XrdFrmXfrQueue.hh.

60{}

Member Function Documentation

◆ Add()

int XrdFrmXfrQueue::Add ( XrdFrcRequest * rP,
XrdFrcReqFile * reqF,
int theQ )
static

Definition at line 73 of file XrdFrmXfrQueue.cc.

74{
75 XrdFrmXfrJob *xP;
76 struct stat buf;
77 const char *xfrType = xfrName(*rP, qNum);
78 char *Lfn, lclpath[MAXPATHLEN];
79 int Outgoing = (qNum & XrdFrcRequest::outQ);
80
81// Validate queue number
82//
83 if (qNum < 0 || qNum >= XrdFrcRequest::numQ-1)
84 {sprintf(lclpath, "%d", qNum);
85 Say.Emsg("Queue", lclpath, " is an invalid queue; skipping", rP->LFN);
86 if (reqFQ) reqFQ->Del(rP);
87 return 0;
88 }
89
90// First check if this request is active or pending. If it's an inbound request
91// then only the lfn matters regardless of source. For outgoing requests then
92// the lfn plus the target only matters.
93//
94 Lfn = (Outgoing ? rP->LFN : (rP->LFN)+rP->LFO);
95 hMutex.Lock();
96 if ((xP = hTab.Find(Lfn)))
98 && strcmp(xP->reqData.Notify, rP->Notify))
99 {XrdOucTList *tP = new XrdOucTList(rP->Notify, 0, xP->NoteList);
100 xP->NoteList = tP;
101 }
102 hMutex.UnLock();
104 {sprintf(lclpath, " in progress; %s skipped for ", xfrType);
105 Say.Say(0, xP->Type, xP->reqData.LFN, lclpath, rP->User);
106 }
107 if (reqFQ) reqFQ->Del(rP);
108 return 0;
109 }
110 hMutex.UnLock();
111
112// Obtain the local name
113//
114 if (!Config.LocalPath((rP->LFN)+rP->LFO, lclpath, sizeof(lclpath)-16))
115 {if (reqFQ) reqFQ->Del(rP);
116 return Notify(rP, qNum, 1, "Unable to generate pfn");
117 }
118
119// Check if the file exists or not. For incoming requests, the file must not
120// exist. For outgoing requests the file must exist.
121//
122 if (Config.Stat((rP->LFN)+rP->LFO, lclpath, &buf))
123 {if (Outgoing)
125 Say.Say(0, xfrType,"skipped; ",lclpath," not resident.");
126 if (reqFQ) reqFQ->Del(rP);
127 return Notify(rP, qNum, 2, "file not resident");
128 }
129 } else {
130 if (!Outgoing)
132 Say.Say(0, xfrType, "skipped; ", lclpath, " exists.");
133 if (reqFQ) reqFQ->Del(rP);
134 return Notify(rP, qNum, 0);
135 }
136 }
137
138// Obtain a queue slot, we may block until one is available
139//
140 do {qMutex.Lock();
141 if ((xP = xfrQ[qNum].Free)) break;
142 qMutex.UnLock();
143 xfrQ[qNum].Avail.Wait();
144 } while(!xP);
145 xfrQ[qNum].Free = xP->Next;
146 qMutex.UnLock();
147
148// Initialize the slot
149//
150 xP->Next = 0;
151 xP->NoteList = 0;
152 xP->reqFQ = reqFQ;
153 xP->reqData = *rP;
154 xP->reqFile = (Outgoing ? xP->reqData.LFN : (xP->reqData.LFN)+rP->LFO);
155 strcpy(xP->PFN, lclpath);
156 xP->pfnEnd = strlen(lclpath);
157 xP->RetCode = 0;
158 xP->qNum = qNum;
159 xP->Act =*xfrType;
160 xP->Type = xfrType+1;
161
162// Add this to the table of requests
163//
164 hMutex.Lock();
165 hTab.Add(xP->reqFile, xP, 0, Hash_keep);
166 hMutex.UnLock();
167
168// Place request in the appropriate transfer queue
169//
170 qMutex.Lock();
171 if (xfrQ[qNum].Last) {xfrQ[qNum].Last->Next = xP; xfrQ[qNum].Last = xP;}
172 else xfrQ[qNum].Last = xfrQ[qNum].First = xP;
173 qMutex.UnLock();
174 qReady.Post();
175
176// All done
177//
178 return 1;
179}
XrdOucPup XrdCmsParser::Pup & Say
#define TRACE_Debug
XrdOucTList * NoteList
char PFN[MAXPATHLEN+16]
const char * Type
XrdFrmXfrJob * Next
XrdFrcRequest reqData
XrdFrcReqFile * reqFQ
@ Hash_keep
Definition XrdOucHash.hh:55
#define stat(a, b)
Definition XrdPosix.hh:101
char LFN[3072]
static const int msgFail
static const int msgSucc
static const int outQ
static const int numQ
char Notify[512]
int Stat(const char *xLfn, const char *xPfn, struct stat *buff)
int LocalPath(const char *oldp, char *newp, int newpsz)
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
T * Find(const char *KeyVal, time_t *KeyTime=0)
XrdOucTrace Trace
XrdFrmConfig Config

References XrdFrmXfrJob::Act, XrdOucHash< T >::Add(), XrdFrm::Config, XrdFrcReqFile::Del(), XrdOucHash< T >::Find(), Hash_keep, XrdFrcRequest::LFN, XrdFrcRequest::LFO, XrdFrmConfig::LocalPath(), XrdSysMutex::Lock(), XrdFrcRequest::msgFail, XrdFrcRequest::msgSucc, XrdFrmXfrJob::Next, XrdFrmXfrJob::NoteList, XrdFrcRequest::Notify, XrdFrcRequest::numQ, XrdFrcRequest::Options, XrdFrcRequest::outQ, XrdFrmXfrJob::PFN, XrdFrmXfrJob::pfnEnd, XrdSysSemaphore::Post(), XrdFrmXfrJob::qNum, XrdFrmXfrJob::reqData, XrdFrmXfrJob::reqFile, XrdFrmXfrJob::reqFQ, XrdFrmXfrJob::RetCode, Say, stat, XrdFrmConfig::Stat(), XrdFrc::Trace, TRACE_Debug, XrdFrmXfrJob::Type, XrdSysMutex::UnLock(), XrdFrcRequest::User, XrdFrmConfig::Verbose, XrdSysSemaphore::Wait(), and XrdOucTrace::What.

Referenced by XrdFrmReqBoss::Process(), and XrdFrmMigrate::Queue().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Done()

void XrdFrmXfrQueue::Done ( XrdFrmXfrJob * xP,
const char * Msg )
static

Definition at line 185 of file XrdFrmXfrQueue.cc.

186{
187 XrdOucTList *tP;
188
189// Send notifications to everyone that wants it that this job is done
190//
191 do {Notify(&(xP->reqData), xP->qNum, xP->RetCode, Msg);
192 if ((tP = xP->NoteList))
193 {strcpy(xP->reqData.Notify, tP->text);
194 xP->NoteList = tP->next;
195 delete tP;
196 }
197 } while(tP);
198
199// Remove this job from the queue file
200//
201 if (xP->reqFQ) xP->reqFQ->Del(&(xP->reqData));
202
203// Remove this job from the active table
204//
205 hMutex.Lock(); hTab.Del(xP->reqFile); hMutex.UnLock();
206
207// Place job element on the free queue
208//
209 qMutex.Lock();
210 xP->Next = xfrQ[xP->qNum].Free;
211 xfrQ[xP->qNum].Free = xP;
212 xfrQ[xP->qNum].Avail.Post();
213 qMutex.UnLock();
214}
void Del(XrdFrcRequest *rP)
int Del(const char *KeyVal, XrdOucHash_Options opt=Hash_default)
XrdOucTList * next

References XrdOucHash< T >::Del(), XrdFrcReqFile::Del(), XrdSysMutex::Lock(), XrdFrmXfrJob::Next, XrdOucTList::next, XrdFrmXfrJob::NoteList, XrdFrcRequest::Notify, XrdSysSemaphore::Post(), XrdFrmXfrJob::qNum, XrdFrmXfrJob::reqData, XrdFrmXfrJob::reqFile, XrdFrmXfrJob::reqFQ, XrdFrmXfrJob::RetCode, XrdOucTList::text, and XrdSysMutex::UnLock().

Referenced by XrdFrmTransfer::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Get()

XrdFrmXfrJob * XrdFrmXfrQueue::Get ( int ioQType)
static

Definition at line 220 of file XrdFrmXfrQueue.cc.

221{
222 XrdFrmXfrJob *xfrP;
223
224// Wait for an available job and return it
225//
226 do {qReady.Wait();} while(!(xfrP = Pull(ioQType)));
227 return xfrP;
228}

References XrdSysSemaphore::Wait().

Referenced by XrdFrmTransfer::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Init()

int XrdFrmXfrQueue::Init ( )
static

Definition at line 239 of file XrdFrmXfrQueue.cc.

240{
241 static const char *StopFN[] = {"STAGE", "MIGR", "COPYIN", "COPYOUT"};
242 static const char *StopQN[] = {"stage", "migr", "copyin", "copyout"};
243 XrdFrmXfrJob *xP;
244 pthread_t tid;
245 char StopFile[1024], *fnSfx;
246 int n, qNum, retc;
247
248// Prepare to initialize the queues
249//
250 strcpy(StopFile, Config.AdminPath);
251 strcat(StopFile, "STOP");
252 fnSfx = StopFile + strlen(StopFile);
253
254// Initialize each queue
255//
256 for (qNum= 0; qNum < XrdFrcRequest::numQ-1; qNum++)
257 {
258
259 // Initialize the stop file name and set the queue name and number
260 //
261 strcpy(fnSfx, StopFN[qNum]);
262 xfrQ[qNum].File = strdup(StopFile);
263 xfrQ[qNum].Name = StopQN[qNum];
264 xfrQ[qNum].qNum = qNum;
265
266 // Start the stop file monitor thread for this queue
267 //
268 if ((retc = XrdSysThread::Run(&tid, InitStop, (void *)&xfrQ[qNum],
269 XRDSYSTHREAD_BIND, "Stopfile monitor")))
270 {Say.Emsg("main", retc, "create stopfile thread"); return 0;}
271
272 // Create twice as many free queue elements as we have xfr agents for the
273 // queue. This prevents stalls when a particular queue is stopped but keeps
274 // us from exceeding internal resources when we get flooded with requests.
275 //
276 n = Config.xfrMax*2;
277 while(n--)
278 {xP = new XrdFrmXfrJob;
279 xP->Next = xfrQ[qNum].Free;
280 xfrQ[qNum].Free = xP;
281 xfrQ[qNum].Avail.Post();
282 }
283 }
284
285// All done
286//
287 return 1;
288}
void * InitStop(void *parg)
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdFrmConfig::AdminPath, XrdFrm::Config, InitStop(), XrdFrmXfrJob::Next, XrdFrcRequest::numQ, XrdSysSemaphore::Post(), XrdSysThread::Run(), Say, XrdFrmConfig::xfrMax, and XRDSYSTHREAD_BIND.

Referenced by XrdFrmTransfer::Init().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ StopMon()

void XrdFrmXfrQueue::StopMon ( void * parg)
static

Definition at line 438 of file XrdFrmXfrQueue.cc.

439{
440 struct theQueue *monQ = (struct theQueue *)parg;
441 XrdFrmXfrJob *xP;
442 struct stat buf;
443 char theMsg[80];
444 int Cnt;
445
446// Establish which message to produce
447//
448 sprintf(theMsg, "exists; %s transfers suspended.", monQ->Name);
449
450// Wait until someone needs to tell us to check for a stop file
451//
452 while(1)
453 {monQ->Alert.Wait();
454 Cnt = 0;
455 while(!stat(monQ->File, &buf))
456 {if (!Cnt--) {Say.Emsg("StopMon", monQ->File, theMsg); Cnt = 12;}
458 }
459 qMutex.Lock();
460 monQ->Stop = 0;
461 xP = monQ->First;
462 while(xP) {qReady.Post(); xP = xP->Next;}
463 qMutex.UnLock();
464 }
465}
static void Snooze(int seconds)

References XrdSysMutex::Lock(), XrdFrmXfrJob::Next, XrdSysSemaphore::Post(), Say, XrdSysTimer::Snooze(), stat, XrdSysMutex::UnLock(), and XrdSysSemaphore::Wait().

Referenced by InitStop().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Data Documentation

◆ useAnyQ

const int XrdFrmXfrQueue::useAnyQ = 0
static

Definition at line 50 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().

◆ useInpQ

const int XrdFrmXfrQueue::useInpQ = 1
static

Definition at line 49 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().

◆ useOutQ

const int XrdFrmXfrQueue::useOutQ = -1
static

Definition at line 51 of file XrdFrmXfrQueue.hh.

Referenced by XrdFrmTransfer::Init().


The documentation for this class was generated from the following files: