XRootD
Loading...
Searching...
No Matches
XrdOfsTPCProg Class Reference

#include <XrdOfsTPCProg.hh>

+ Collaboration diagram for XrdOfsTPCProg:

Public Member Functions

 XrdOfsTPCProg (XrdOfsTPCProg *Prev, int num, int errMon)
 
 ~XrdOfsTPCProg ()
 
void Cancel ()
 
void Run ()
 
int Xeq (bool &isIPv4)
 

Static Public Member Functions

static int Init ()
 
static XrdOfsTPCProgStart (XrdOfsTPCJob *jP, int &rc)
 

Detailed Description

Definition at line 40 of file XrdOfsTPCProg.hh.

Constructor & Destructor Documentation

◆ XrdOfsTPCProg()

XrdOfsTPCProg::XrdOfsTPCProg ( XrdOfsTPCProg * Prev,
int num,
int errMon )

Definition at line 122 of file XrdOfsTPCProg.cc.

123 : Prog(&OfsEroute, errMon),
124 JobStream(&OfsEroute),
125 Next(Prev), Job(0)
126 {snprintf(Pname, sizeof(Pname), "TPC job %d: ", num);
127 Pname[sizeof(Pname)-1] = 0;
128 }
XrdSysError OfsEroute(0)

Referenced by Init().

+ Here is the caller graph for this function:

◆ ~XrdOfsTPCProg()

XrdOfsTPCProg::~XrdOfsTPCProg ( )
inline

Definition at line 57 of file XrdOfsTPCProg.hh.

57{}

Member Function Documentation

◆ Cancel()

void XrdOfsTPCProg::Cancel ( )
inline

Definition at line 44 of file XrdOfsTPCProg.hh.

44{JobStream.Drain();}

References XrdOucStream::Drain().

Referenced by XrdOfsTPCJob::Del().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Init()

int XrdOfsTPCProg::Init ( )
static

Definition at line 167 of file XrdOfsTPCProg.cc.

168{
169 int n;
170
171// Allocate copy program objects
172//
173 for (n = 0; n < Cfg.xfrMax; n++)
174 {pgmIdle = new XrdOfsTPCProg(pgmIdle, n, Cfg.errMon);
175 if (pgmIdle->Prog.Setup(Cfg.XfrProg, &OfsEroute)) return 0;
176 }
177
178// All done
179//
180 Cfg.doEcho = Cfg.doEcho || GTRACE(debug);
181 return 1;
182}
#define GTRACE(act)
XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon)
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
XrdOfsTPCConfig Cfg
Definition XrdOfsTPC.cc:85

References XrdOfsTPCProg(), XrdOfsTPCParms::Cfg, XrdOfsTPCConfig::doEcho, XrdOfsTPCConfig::errMon, GTRACE, OfsEroute, XrdOucProg::Setup(), XrdOfsTPCConfig::xfrMax, and XrdOfsTPCConfig::XfrProg.

Referenced by XrdOfsTPC::Start().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Run()

void XrdOfsTPCProg::Run ( )

Definition at line 188 of file XrdOfsTPCProg.cc.

189{
191 struct stat Stat;
192 const char *clID, *at;
193 char *questSrc, *questLfn, *questDst;
194 int rc;
195 bool isIPv4, doMon = Cfg.tpcMon != 0;
196 char clBuff[592];
197
198// Run the current job and indicate it's ending status and possibly getting a
199// another job to run. Note "Job" will always be valid.
200//
201do{if (doMon)
202 {monInfo.Init();
203 gettimeofday(&monInfo.begT, 0);
204 }
205
206 rc = Xeq(isIPv4);
207
208 if (doMon)
209 {gettimeofday(&monInfo.endT, 0);
210 if ((questSrc = index(Job->Info.Key, '?'))) *questSrc = 0;
211 monInfo.srcURL = Job->Info.Key;
212 if ((questLfn = index(Job->Info.Lfn, '?'))) *questLfn = 0;
213 monInfo.dstURL = Job->Info.Lfn;
214 monInfo.endRC = rc;
215 if (Job->Info.Str) monInfo.strm = Job->Info.Str;
216 if (isIPv4) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
217
218 clID = Job->Info.Org;
219 if (clID && (at = index(clID, '@')) && !index(at+1, '.'))
220 {const char *dName = XrdNetIdentity::Domain();
221 if (dName)
222 {snprintf(clBuff, sizeof(clBuff), "%s%s", clID, dName);
223 clID = clBuff;
224 }
225 }
226 monInfo.clID = clID;
227
228 if ((questDst = index(Job->Info.Dst, '?'))) *questDst = 0;
229 if (!XrdOfsOss->Stat(Job->Info.Dst, &Stat)) monInfo.fSize = Stat.st_size;
230 if (questDst) *questDst = '?';
231 Cfg.tpcMon->Report(monInfo);
232 if (questLfn) *questLfn = '?';
233 if (questSrc) *questSrc = '?';
234 }
235
236 Job = Job->Done(this, eRec, rc);
237
238 } while(Job);
239
240// No more jobs to run. Place us on the idle queue. Upon return this thread
241// will end.
242//
243 pgmMutex.Lock();
244 Next = pgmIdle;
245 pgmIdle = this;
246 pgmMutex.UnLock();
247}
struct stat Stat
Definition XrdCks.cc:49
XrdOss * XrdOfsOss
Definition XrdOfs.cc:163
#define stat(a, b)
Definition XrdPosix.hh:101
static const char * Domain(const char **eText=0)
XrdOfsTPCJob * Done(XrdOfsTPCProg *pgmP, const char *eTxt, int rc)
int Xeq(bool &isIPv4)
XrdOfsTPCInfo Info
Definition XrdOfsTPC.hh:109
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
void Report(TpcInfo &info)
XrdXrootdTpcMon * tpcMon

References XrdXrootdTpcMon::TpcInfo::begT, XrdOfsTPCParms::Cfg, XrdXrootdTpcMon::TpcInfo::clID, XrdNetIdentity::Domain(), XrdOfsTPCJob::Done(), XrdOfsTPCInfo::Dst, XrdXrootdTpcMon::TpcInfo::dstURL, XrdXrootdTpcMon::TpcInfo::endRC, XrdXrootdTpcMon::TpcInfo::endT, XrdXrootdTpcMon::TpcInfo::fSize, XrdOfsTPC::Info, XrdXrootdTpcMon::TpcInfo::Init(), XrdXrootdTpcMon::TpcInfo::isIPv4, XrdOfsTPCInfo::Key, XrdOfsTPCInfo::Lfn, XrdSysMutex::Lock(), XrdXrootdTpcMon::TpcInfo::opts, XrdOfsTPCInfo::Org, XrdXrootdTpcMon::Report(), XrdXrootdTpcMon::TpcInfo::srcURL, Stat, stat, XrdOss::Stat(), XrdOfsTPCInfo::Str, XrdXrootdTpcMon::TpcInfo::strm, XrdOfsTPCConfig::tpcMon, XrdSysMutex::UnLock(), Xeq(), and XrdOfsOss.

Referenced by XrdOfsTPCProgRun().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Start()

XrdOfsTPCProg * XrdOfsTPCProg::Start ( XrdOfsTPCJob * jP,
int & rc )
static

Definition at line 253 of file XrdOfsTPCProg.cc.

254{
255 XrdSysMutexHelper pgmMon(&pgmMutex);
256 XrdOfsTPCProg *pgmP;
257 pthread_t tid;
258
259// Get a new program object, if none left, tell the caller to try later
260//
261 if (!(pgmP = pgmIdle)) {rc = 0; return 0;}
262 pgmP->Job = jP;
263
264// Start a thread to run the job
265//
266 if ((rc = XrdSysThread::Run(&tid, XrdOfsTPCProgRun, (void *)pgmP, 0,
267 "TPC job")))
268 return 0;
269
270// We are all set, return the program being used
271//
272 pgmIdle = pgmP->Next;
273 return pgmP;
274}
void * XrdOfsTPCProgRun(void *pp)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References XrdSysThread::Run(), and XrdOfsTPCProgRun().

Referenced by XrdOfsTPCJob::Sync().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Xeq()

int XrdOfsTPCProg::Xeq ( bool & isIPv4)

Definition at line 280 of file XrdOfsTPCProg.cc.

281{
282 EPNAME("Xeq");
283 credFile cFile(Job);
284 const char *Args[6], *eVec[6], **envArg;
285 char *lP, *Colon, *cksVal, sBuff[8], *tident = Job->Info.Org;
286 char *Quest = index(Job->Info.Key, '?');
287 int i, rc, aNum = 0;
288
289// If we have credentials, write them out to a file
290//
291 if (cFile.Path && (rc = ExportCreds(cFile.Path)))
292 {strcpy(eRec, "Copy failed; unable to pass credentials.");
293 return rc;
294 }
295
296// Echo out what we are doing if so desired
297//
298 if (Cfg.doEcho)
299 {if (Quest) *Quest = 0;
300 OfsEroute.Say(Pname,tident," copying ",Job->Info.Key," to ",Job->Info.Dst);
301 if (Quest) *Quest = '?';
302 }
303
304// Determine checksum option
305//
306 cksVal = (Job->Info.Cks ? Job->Info.Cks : Cfg.cksType);
307 if (cksVal)
308 {Args[aNum++] = "-C";
309 Args[aNum++] = cksVal;
310 }
311
312// Set streams option if need be
313//
314 if (Job->Info.Str)
315 {sprintf(sBuff, "%d", static_cast<int>(Job->Info.Str));
316 Args[aNum++] = "-S";
317 Args[aNum++] = sBuff;
318 }
319
320// Set remaining arguments
321//
322 Args[aNum++] = Job->Info.Key;
323 Args[aNum++] = Job->Info.Dst;
324
325// Always export the trace identifier of the original issuer
326//
327 char tidBuff[512];
328 snprintf(tidBuff, sizeof(tidBuff), "XRD_TIDENT=%s", tident);
329 eVec[0] = tidBuff;
330 envArg = eVec;
331 i = 1;
332
333// Export source protocol if present
334//
335 char sprBuff[128];
336 if (Job->Info.Spr)
337 {snprintf(sprBuff, sizeof(sprBuff), "XRDTPC_SPROT=%s", Job->Info.Spr);
338 eVec[i++] = sprBuff;
339 }
340
341// Export target protocol if present
342//
343 char tprBuff[128];
344 if (Job->Info.Tpr)
345 {snprintf(tprBuff, sizeof(tprBuff), "XRDTPC_TPROT=%s", Job->Info.Tpr);
346 eVec[i++] = tprBuff;
347 }
348
349// If we need to reproxy, export the path
350//
351 char rpxBuff[1024];
352 if (Job->Info.Rpx)
353 {snprintf(rpxBuff, sizeof(rpxBuff), "XRD_CPTARGET=%s", Job->Info.Rpx);
354 eVec[i++] = rpxBuff;
355 }
356
357// Determine if credentials are being passed, If so, pass where it is.
358//
359 if (cFile.Path) eVec[i++] = cFile.pEnv;
360 eVec[i] = 0;
361
362// Start the job.
363//
364 if ((rc = Prog.Run(&JobStream, Args, aNum, envArg)))
365 {strcpy(eRec, "Copy failed; unable to start job.");
366 OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
367 return rc;
368 }
369
370// Now we drain the output looking for an end of run line. This line should
371// be printed as an error message should the copy fail.
372//
373 *eRec = 0;
374 isIPv4 = false;
375 while((lP = JobStream.GetLine()))
376 {if (!strcmp(lP, "!-!IPv4")) isIPv4 = true;
377 if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
378 {strncpy(eRec, Colon+2, sizeof(eRec)-1);
379 eRec[sizeof(eRec)-1] = 0;
380 }
381 if (Cfg.doEcho && *lP) OfsEroute.Say(Pname, lP);
382 }
383
384// The job has completed. So, we must get the ending status.
385//
386 if ((rc = Prog.RunDone(JobStream)) < 0) rc = -rc;
387 DEBUG(Pname <<"ended with rc=" <<rc);
388
389// Check if we should generate a message
390//
391 if (rc && !(*eRec)) sprintf(eRec, "Copy failed with return code %d", rc);
392
393// Log failures and optionally remove the file (Info would do that as well
394// but much later on, so we do it now).
395//
396 if (rc)
397 {OfsEroute.Emsg("TPC", Job->Info.Org, Job->Info.Lfn, eRec);
398 if (Cfg.autoRM) XrdOfsOss->Unlink(Job->Info.Lfn);
399 } else Job->Info.Success();
400
401// All done
402//
403 return rc;
404}
#define tident
#define DEBUG(x)
#define EPNAME(x)
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
int RunDone(XrdOucStream &cmd) const
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
char * GetLine()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)

References XrdOfsTPCConfig::autoRM, XrdOfsTPCParms::Cfg, XrdOfsTPCInfo::Cks, XrdOfsTPCConfig::cksType, DEBUG, XrdOfsTPCConfig::doEcho, XrdOfsTPCInfo::Dst, XrdSysError::Emsg(), EPNAME, XrdOucStream::GetLine(), XrdOfsTPC::Info, XrdOfsTPCInfo::Key, XrdOfsTPCInfo::Lfn, OfsEroute, XrdOfsTPCInfo::Org, XrdOfsTPCInfo::Rpx, XrdOucProg::Run(), XrdOucProg::RunDone(), XrdSysError::Say(), XrdOfsTPCInfo::Spr, XrdOfsTPCInfo::Str, XrdOfsTPCInfo::Success(), tident, XrdOfsTPCInfo::Tpr, XrdOss::Unlink(), and XrdOfsOss.

Referenced by Run().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: