XRootD
Loading...
Searching...
No Matches
XrdOssStage.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s S t a g e . c c */
4/* */
5/* */
6/* (C) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
7/* All Rights Reserved */
8/* Produced by Andrew Hanushevsky for Stanford University under contract */
9/* DE-AC02-76-SFO0515 with the Deprtment of Energy */
10/* */
11/* This file is part of the XRootD software suite. */
12/* */
13/* XRootD is free software: you can redistribute it and/or modify it under */
14/* the terms of the GNU Lesser General Public License as published by the */
15/* Free Software Foundation, either version 3 of the License, or (at your */
16/* option) any later version. */
17/* */
18/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21/* License for more details. */
22/* */
23/* You should have received a copy of the GNU Lesser General Public License */
24/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26/* */
27/* The copyright holder's institutional names and contributor's names may not */
28/* be used to endorse or promote products derived from this software without */
29/* specific prior written permission of the institution or contributor. */
30/******************************************************************************/
31
32/* The XrdOssStage() routine is responsible for getting data from a remote
33 location to the local filesystem. The current implementation invokes a
34 shell script to perform the "staging".
35
36 This routine is thread-safe if compiled with:
37 AIX: -D_THREAD_SAFE
38 SUN: -D_REENTRANT
39*/
40
41#include <unistd.h>
42#include <cerrno>
43#include <strings.h>
44#include <signal.h>
45#include <cstdio>
46#include <ctime>
47#include <sys/param.h>
48#include <sys/stat.h>
49#include <sys/stat.h>
50#include <sys/wait.h>
51
54#include "XrdOss/XrdOssApi.hh"
55#include "XrdOss/XrdOssError.hh"
57#include "XrdOss/XrdOssStage.hh"
58#include "XrdOuc/XrdOuca2x.hh"
59#include "XrdOuc/XrdOucEnv.hh"
60#include "XrdOuc/XrdOucMsubs.hh"
62#include "XrdOuc/XrdOucProg.hh"
63#include "XrdOuc/XrdOucReqID.hh"
64#include "XrdFrc/XrdFrcProxy.hh"
65
66/******************************************************************************/
67/* G l o b a l s a n d S t a t i c O b j e c t s */
68/******************************************************************************/
69
71
75
76#define XRDOSS_FAIL_FILE (char *)".fail"
77
78/******************************************************************************/
79/* E x t e r n a l F u n c t i o n s */
80/******************************************************************************/
81
82extern unsigned long XrdOucHashVal(const char *KeyVal);
83
84int XrdOssScrubScan(const char *key, char *cip, void *xargp) {return 0;}
85
86/******************************************************************************/
87/* o o s s _ F i n d _ P r t y */
88/******************************************************************************/
89
90int XrdOssFind_Prty(XrdOssStage_Req *req, void *carg)
91{
92 int prty = *(int *)carg;
93 return (req->prty >= prty);
94}
95
96/******************************************************************************/
97/* o o s s _ F i n d _ R e q */
98/******************************************************************************/
99
100int XrdOssFind_Req(XrdOssStage_Req *req, void *carg)
101{
102 XrdOssStage_Req *xreq = (XrdOssStage_Req *)carg;
103 return (req->hash == xreq->hash) && !strcmp(req->path, xreq->path);
104}
105
106/******************************************************************************/
107/* S t a g e */
108/******************************************************************************/
109
110int XrdOssSys::Stage(const char *Tid, const char *fn, XrdOucEnv &env,
111 int Oflag, mode_t Mode, unsigned long long Popts)
112{
113// Use the appropriate method here: queued staging or real-time staging
114//
115 return (StageRealTime ? Stage_RT(Tid, fn, env, Popts)
116 : Stage_QT(Tid, fn, env, Oflag, Mode));
117}
118
119/******************************************************************************/
120/* S t a g e _ Q T */
121/******************************************************************************/
122
123int XrdOssSys::Stage_QT(const char *Tid, const char *fn, XrdOucEnv &env,
124 int Oflag, mode_t Mode)
125{
126 static XrdOucReqID ReqID;
127 static XrdSysMutex PTMutex;
128 static XrdOucHash<char> PTable;
129 static time_t nextScrub = xfrkeep + time(0);
130 char *Found, *pdata[XrdOucMsubs::maxElem + 2];
131 int pdlen[XrdOucMsubs::maxElem + 2];
132 time_t cTime, mTime, tNow = time(0);
133
134// If there is a fail file and the error occurred within the hold time,
135// fail the request. Otherwise, try it again. This avoids tight loops.
136//
137 if ((cTime = HasFile(fn, XRDOSS_FAIL_FILE, &mTime))
138 && xfrhold && (tNow - cTime) < xfrhold)
139 return (mTime != 2 ? -XRDOSS_E8009 : -ENOENT);
140
141// If enough time has gone by between the last scrub, do it now
142//
143 if (nextScrub < tNow)
144 {PTMutex.Lock();
145 if (nextScrub < tNow)
146 {PTable.Apply(XrdOssScrubScan, (void *)0);
147 nextScrub = xfrkeep + tNow;
148 }
149 PTMutex.UnLock();
150 }
151
152// Check if this file is already being brought in. If so, return calculated
153// wait time for this file.
154//
155 PTMutex.Lock();
156 Found = PTable.Add(fn, 0, xfrkeep, Hash_data_is_key);
157 PTMutex.UnLock();
158 if (Found) return CalcTime();
159
160// Check if we should use our built-in frm interface
161//
162 if (StageFrm)
163 {char idbuff[64];
164 ReqID.ID(idbuff, sizeof(idbuff));
165 int n;
166 return (n = StageFrm->Add('+', fn, env.Env(n), Tid, idbuff,
168 }
169
170// If a stagemsg template was not defined; use our default template
171//
172 if (!StageSnd)
173 {char idbuff[64], usrbuff[512];
174 ReqID.ID(idbuff, sizeof(idbuff));
175 if (!StageFormat)
176 {pdata[0] = (char *)"+ "; pdlen[0] = 2;}
177else {pdlen[0] = getID(Tid,env,usrbuff,sizeof(usrbuff)); pdata[0] = usrbuff;}
178 pdata[1] = idbuff; pdlen[1] = strlen(idbuff); // Request ID
179 pdata[2] = (char *)" "; pdlen[2] = 1;
180 pdata[3] = StageEvents; pdlen[3] = StageEvSize; // notification
181 pdata[4] = (char *)" "; pdlen[4] = 1;
182 pdata[5] = (char *)"0 "; pdlen[5] = 2; // prty
183 pdata[6] = StageAction; pdlen[6] = StageActLen; // action
184 pdata[7] = (char *)fn; pdlen[7] = strlen(fn);
185 pdata[8] = (char *)"\n"; pdlen[8] = 1;
186 pdata[9] = 0; pdlen[9] = 0;
187 if (StageProg->Feed((const char **)pdata, pdlen)) return -XRDOSS_E8025;
188 } else {
189 XrdOucMsubsInfo Info(Tid, &env, lcl_N2N, fn, 0,
190 Mode, Oflag, StageAction, "n/a");
191 int k = StageSnd->Subs(Info, pdata, pdlen);
192 pdata[k] = (char *)"\n"; pdlen[k++] = 1;
193 pdata[k] = 0; pdlen[k] = 0;
194 if (StageProg->Feed((const char **)pdata, pdlen)) return -XRDOSS_E8025;
195 }
196
197// All done
198//
199 return CalcTime();
200}
201
202/******************************************************************************/
203/* S t a g e _ R T */
204/******************************************************************************/
205
206int XrdOssSys::Stage_RT(const char *Tid, const char *fn, XrdOucEnv &env,
207 unsigned long long Popts)
208{
209 extern int XrdOssFind_Prty(XrdOssStage_Req *req, void *carg);
211 XrdOssStage_Req req, *newreq, *oldreq;
212 struct stat statbuff;
213 extern int XrdOssFind_Req(XrdOssStage_Req *req, void *carg);
214 char actual_path[MAXPATHLEN+1], *remote_path;
215 char *val;
216 int rc, prty;
217
218// If there is no stagecmd then return an error
219//
220 if (!StageCmd) return -XRDOSS_E8006;
221
222// Set up the minimal new request structure
223//
224 req.hash = XrdOucHashVal(fn);
225 req.path = strdup(fn);
226
227// Check if this file is already being brought in. If it's in the chain but
228// has an error associated with it. If the error window is still in effect,
229// check if a fail file exists. If one does exist, fail the request. If it
230// doesn't exist or if the window has expired, delete the error element and
231// retry the request. This keeps us from getting into tight loops.
232//
233 if ((oldreq = XrdOssStage_Req::StageQ.fullList.Apply(XrdOssFind_Req,(void *)&req)))
234 {if (!(oldreq->flags & XRDOSS_REQ_FAIL)) return CalcTime(oldreq);
235 if (oldreq->sigtod > time(0) && HasFile(fn, XRDOSS_FAIL_FILE))
236 return (oldreq->flags & XRDOSS_REQ_ENOF ? -ENOENT : -XRDOSS_E8009);
237 delete oldreq;
238 }
239
240// Generate remote path
241//
242 if (rmt_N2N)
243 if ((rc = rmt_N2N->lfn2rfn(fn, actual_path, sizeof(actual_path))))
244 return rc;
245 else remote_path = actual_path;
246 else remote_path = (char *)fn;
247
248// Obtain the size of this file, if possible. Note that an exposure exists in
249// that a request for the file may come in again before we have the size. This
250// is ok, it just means that we'll be off in our time estimate
251//
252 if (Popts & XRDEXP_NOCHECK) statbuff.st_size = 1024*1024*1024;
253 else {StageAccess.UnLock();
254 if ((rc = MSS_Stat(remote_path, &statbuff))) return rc;
255 StageAccess.Lock(&XrdOssStage_Req::StageMutex);
256 }
257
258// Create a new request
259//
260 if (!(newreq = new XrdOssStage_Req(req.hash, fn)))
261 return OssEroute.Emsg("Stage",-ENOMEM,"create req for",fn);
262
263// Add this request to the list of requests
264//
266
267// Recalculate the cumalitive pending stage queue and
268//
269 newreq->size = statbuff.st_size;
270 pndbytes += statbuff.st_size;
271
272// Calculate the system priority
273//
274 if (!(val = env.Get(OSS_SYSPRTY))) prty = OSS_USE_PRTY;
275 else if (XrdOuca2x::a2i(OssEroute,"system prty",val,&prty,0)
276 || prty > OSS_MAX_PRTY) return -XRDOSS_E8010;
277 else prty = prty << 8;
278
279// Calculate the user priority
280//
281 if (OptFlags & XrdOss_USRPRTY && (val = env.Get(OSS_USRPRTY)))
282 {if (XrdOuca2x::a2i(OssEroute,"user prty",val,&rc,0)
283 || rc > OSS_MAX_PRTY) return -XRDOSS_E8010;
284 prty |= rc;
285 }
286
287// Queue the request at the right position and signal an xfr thread
288//
289 if ((oldreq = XrdOssStage_Req::StageQ.pendList.Apply(XrdOssFind_Prty,(void *)&prty)))
290 oldreq->pendList.Insert(&newreq->pendList);
293
294// Return the estimated time to arrival
295//
296 return CalcTime(newreq);
297}
298
299/******************************************************************************/
300/* S t a g e _ I n */
301/******************************************************************************/
302
303void *XrdOssSys::Stage_In(void *carg)
304{
306 XrdOssStage_Req *req;
307 int rc, alldone = 0;
308 time_t etime;
309
310 // Wait until something shows up in the ready queue and process
311 //
313
314 // Obtain exclusive control over the queues
315 //
317
318 // Check if we really have something in the queue
319 //
320 if (XrdOssStage_Req::StageQ.pendList.Singleton())
322 continue;
323 }
324
325 // Remove the last entry in the queue
326 //
328 req = rnode->Item();
329 rnode->Remove();
330 req->flags |= XRDOSS_REQ_ACTV;
331
332 // Account for bytes being moved
333 //
334 pndbytes -= req->size;
335 stgbytes += req->size;
336
337 // Bring in the file (don't hold the stage lock while doing so)
338 //
340 etime = time(0);
341 rc = GetFile(req);
342 etime = time(0) - etime;
344
345 // Account for resources and adjust xfr rate
346 //
347 stgbytes -= req->size;
348 if (!rc)
349 {if (etime > 1)
350 {xfrspeed=((xfrspeed*(totreqs+1))+(req->size/etime))/(totreqs+1);
351 if (xfrspeed < 512000) xfrspeed = 512000;
352 }
353 totreqs++; // Successful requests
354 totbytes += req->size;
355 delete req;
356 }
357 else {req->flags &= ~XRDOSS_REQ_ACTV;
358 req->flags |= (rc == 2 ? XRDOSS_REQ_ENOF : XRDOSS_REQ_FAIL);
359 req->sigtod = xfrhold + time(0);
360 badreqs++;
361 }
362
363 // Check if we should continue or be terminated and unlock staging
364 //
365 if ((alldone = (xfrthreads < xfrtcount)))
366 xfrtcount--;
368
369 } while (!alldone);
370
371// Notmally we would never get here
372//
373 return (void *)0;
374}
375
376
377/******************************************************************************/
378/* P r i v a t e M e t h o d s */
379/******************************************************************************/
380/******************************************************************************/
381/* C a l c T i m e */
382/******************************************************************************/
383
385{
386
387// For queued staging we have no good way to estimate the time, as of yet.
388// So, return 60 seconds. Note that the following code, which is far more
389// elaborate, rarely returns the right estimate anyway.
390//
391 return (StageAsync ? -EINPROGRESS : 60);
392}
393
394int XrdOssSys::CalcTime(XrdOssStage_Req *req) // StageMutex lock held!
395{
396 unsigned long long numq = 1;
397 unsigned long long tbytes = req->size + stgbytes/2;
398 int xfrtime;
399 time_t now;
400 XrdOssStage_Req *rqp = req;
401
402// Return an EINP{ROG if we are doing async staging
403//
404 if (StageAsync) return -EINPROGRESS;
405
406// If the request is active, recalculate the time based on previous estimate
407//
408 if (req->flags & XRDOSS_REQ_ACTV)
409 {if ((xfrtime = req->sigtod - time(0)) > xfrovhd) return xfrtime;
410 else return (xfrovhd < 4 ? 2 : xfrovhd / 2);
411 }
412
413// Calculate the number of pending bytes being transferred plus 1/2 of the
414// current number of bytes being transferred
415//
416 while ((rqp=(rqp->pendList.Next()->Item()))) {tbytes += rqp->size; numq++;}
417
418// Calculate when this request should be completed
419//
420 now = time(0);
421 req->sigtod = tbytes / xfrspeed + numq * xfrovhd + now;
422
423// Calculate the time it will take to get this file
424//
425 if ((xfrtime = req->sigtod - now) <= xfrovhd) return xfrovhd+3;
426 return xfrtime;
427}
428
429/******************************************************************************/
430/* G e t F i l e */
431/******************************************************************************/
432
434{
435 char rfs_fn[MAXPATHLEN+1];
436 char lfs_fn[MAXPATHLEN+1];
437 int retc;
438
439// Convert the local filename and generate the corresponding remote name.
440//
441 if ( (retc = GenLocalPath(req->path, lfs_fn)) ) return retc;
442 if ( (retc = GenRemotePath(req->path, rfs_fn)) ) return retc;
443
444// Run the command to get the file
445//
446 if ((retc = StageProg->Run(rfs_fn, lfs_fn)))
447 {OssEroute.Emsg("Stage", retc, "stage", req->path);
448 return (retc == 2 ? -ENOENT : -XRDOSS_E8009);
449 }
450
451// All went well
452//
453 return 0;
454}
455
456/******************************************************************************/
457/* g e t I D */
458/******************************************************************************/
459
460int XrdOssSys::getID(const char *Tid, XrdOucEnv &Env, char *buff, int bsz)
461{
462 char *bP;
463 int n;
464
465// The buffer always starts with a '+'
466//
467 *buff = '+'; bP = buff+1; bsz -= 3;
468
469// Get the trace id
470//
471 if (Tid && (n = strlen(Tid)) <= bsz) {strcpy(bP, Tid); bP += n;}
472
473// Insert space
474//
475 *bP++ = ' '; *bP = '\0';
476 return bP - buff;
477}
478
479/******************************************************************************/
480/* H a s F i l e */
481/******************************************************************************/
482
483time_t XrdOssSys::HasFile(const char *fn, const char *fsfx, time_t *mTime)
484{
485 struct stat statbuff;
486 int fnlen;
487 char path[MAXPATHLEN+8];
488 char *pp = path;
489
490// Copy the path with possible conversion
491//
492 if (xfrFdir)
493 {strcpy(path, xfrFdir);
494 pp = path + xfrFdln;
495 }
496 if (GenLocalPath(fn, pp)) return 0;
497 fnlen = strlen(path);
498
499// Add the suffix
500//
501 if ((fnlen + strlen(fsfx)) >= sizeof(path)) return 0;
502 strcpy(path+fnlen, fsfx);
503
504// Now check if the file actually exists
505//
506 if (stat(path, &statbuff)) return 0;
507 if (mTime) *mTime = statbuff.st_mtime;
508 return statbuff.st_ctime;
509}
XrdSysError OssEroute
#define XrdOss_USRPRTY
#define XRDOSS_E8006
#define XRDOSS_E8009
#define XRDOSS_E8010
#define XRDOSS_E8025
#define OSS_SYSPRTY
#define OSS_MAX_PRTY
#define OSS_USE_PRTY
#define OSS_USRPRTY
int XrdOssFind_Prty(XrdOssStage_Req *req, void *carg)
int XrdOssFind_Req(XrdOssStage_Req *req, void *carg)
unsigned long XrdOucHashVal(const char *KeyVal)
int XrdOssScrubScan(const char *key, char *cip, void *xargp)
#define XRDOSS_FAIL_FILE
XrdSysError OssEroute
#define XRDOSS_REQ_ACTV
#define XRDOSS_REQ_FAIL
#define XRDOSS_REQ_ENOF
#define XRDEXP_NOCHECK
@ Hash_data_is_key
Definition XrdOucHash.hh:52
#define stat(a, b)
Definition XrdPosix.hh:101
int Mode
int Add(char Opc, const char *Lfn, const char *Opq, const char *Usr, const char *Rid, const char *Nop, const char *Pop, int Prty=1)
static XrdSysSemaphore ReadyRequest
XrdOucDLlist< XrdOssStage_Req > pendList
static XrdOssStage_Req StageQ
const char * path
unsigned long long size
unsigned long hash
static XrdSysMutex StageMutex
XrdOucDLlist< XrdOssStage_Req > fullList
int GenRemotePath(const char *, char *)
Definition XrdOssApi.cc:249
virtual int Stage(const char *, const char *, XrdOucEnv &, int, mode_t, unsigned long long)
time_t HasFile(const char *fn, const char *sfx, time_t *mTime=0)
long long totbytes
Definition XrdOssApi.hh:302
char * StageEvents
Definition XrdOssApi.hh:230
int Stage_RT(const char *, const char *, XrdOucEnv &, unsigned long long)
int StageRealTime
Definition XrdOssApi.hh:221
int xfrthreads
Definition XrdOssApi.hh:298
void * Stage_In(void *carg)
int StageFormat
Definition XrdOssApi.hh:224
char * StageAction
Definition XrdOssApi.hh:233
int StageActLen
Definition XrdOssApi.hh:232
XrdOucProg * StageProg
Definition XrdOssApi.hh:306
int GetFile(XrdOssStage_Req *req)
int GenLocalPath(const char *, char *)
Definition XrdOssApi.cc:232
XrdOucName2Name * lcl_N2N
Definition XrdOssApi.hh:254
int StageAsync
Definition XrdOssApi.hh:222
long long pndbytes
Definition XrdOssApi.hh:300
int MSS_Stat(const char *, struct stat *buff=0)
Definition XrdOssMSS.cc:253
long long stgbytes
Definition XrdOssApi.hh:301
int CalcTime()
XrdOucMsubs * StageSnd
Definition XrdOssApi.hh:227
XrdFrcProxy * StageFrm
Definition XrdOssApi.hh:228
int StageEvSize
Definition XrdOssApi.hh:231
int getID(const char *, XrdOucEnv &, char *, int)
int xfrtcount
Definition XrdOssApi.hh:299
int Stage_QT(const char *, const char *, XrdOucEnv &, int, mode_t)
char * StageCmd
Definition XrdOssApi.hh:225
XrdOucName2Name * rmt_N2N
Definition XrdOssApi.hh:255
char * xfrFdir
Definition XrdOssApi.hh:311
void Insert(XrdOucDLlist *Node, T *Item=0)
XrdOucDLlist * Next()
XrdOucDLlist * Prev()
char * Env(int &envlen)
Definition XrdOucEnv.hh:48
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
T * Apply(int(*func)(const char *, T *, void *), void *Arg)
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
static const int maxElem
virtual int lfn2rfn(const char *lfn, char *buff, int blen)=0
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
int Feed(const char *data[], const int dlen[])
Definition XrdOucProg.cc:63
char * ID(char *buff, int blen)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Lock(XrdSysMutex *Mutex)