XRootD
Loading...
Searching...
No Matches
XrdSysIOEventsPollPoll.icc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d P o l l P o l l . i c c */
4/* */
5/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <unistd.h>
31#include <cstdlib>
32#include <signal.h>
33
34#include "XrdSys/XrdSysE2T.hh"
35#include "XrdSys/XrdSysError.hh"
36#include "Xrd/XrdPollPoll.hh"
37#include "Xrd/XrdScheduler.hh"
38
39
40
41/******************************************************************************/
42/* C l a s s P o l l P o l l */
43/******************************************************************************/
44
45namespace XrdSys
46{
47namespace IOEvents
48{
49class PollPoll : public Poller
50{
51public:
52
53 PollPoll(int &rc, int numfd, int pFD[2]);
55
56protected:
57
58 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg);
59
60 void Exclude(Channel *cP, bool &isLocked, bool dover=1);
61
62 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
63
64 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked);
65
66 void Shutdown();
67
68private:
69 void Dispatch(int ptent, int pollEv);
70 void FDMod(int ptnum, int fd, int events);
71 void FDRem(int ptnum);
72 bool Process();
73
74static const int disFD = 0x80000000;
75
76XrdSysRecMutex pollMutex;
77struct pollfd *pollTab;
78 int pollMax;
79 int pollNum;
80struct pollfd *pnewTab;
81 Channel **chnlTab;
82 int chnlMax;
83 int chnlNum;
84};
85};
86};
87
88/******************************************************************************/
89/* C l a s s P o l l e r */
90/******************************************************************************/
91/******************************************************************************/
92/* Static: n e w P o l l e r */
93/******************************************************************************/
94
96XrdSys::IOEvents::Poller::newPoller(int pipeFD[2],
97 int &eNum,
98 const char **eTxt)
99
100{
101 PollPoll *myPoller;
102
103// Allocate new poller
104//
105 if (!(myPoller = new PollPoll(eNum, 1024, pipeFD))) eNum = ENOMEM;
106
107// Check if all went ell
108//
109 if (!eNum) return (Poller *)myPoller;
110 delete myPoller;
111 if (eTxt) *eTxt = "creating poller";
112 return 0;
113}
114
115/******************************************************************************/
116/* C l a s s P o l l P o l l */
117/******************************************************************************/
118/******************************************************************************/
119/* C o n s t r c u t o r */
120/******************************************************************************/
121
122XrdSys::IOEvents::PollPoll::PollPoll(int &rc, int numfd, int pFD[2])
123 : Poller(pFD[0], pFD[1])
124{
125 int i;
126
127// Allocate initial poll table
128//
129 if (!(pollTab = (struct pollfd *)malloc(numfd*sizeof(struct pollfd))))
130 {rc = errno; return;}
131
132// Initialize it
133//
134 for (i = 1; i < numfd; i++)
135 {pollTab[i].fd = -1; pollTab[i].events = 0; pollTab[i].revents = 0;}
136
137// The first element of the poll tab is the communications pipe
138//
139 pollTab[0].fd = pFD[0];
140 pollTab[0].events = POLLIN | POLLRDNORM;
141 pollTab[0].revents = 0;
142
143// Initialize remaining poll data
144//
145 pollNum = 1;
146 pollMax = numfd;
147 pnewTab = 0;
148
149// Allocate initial channel table
150//
151 if (!(chnlTab = (Channel **)malloc(numfd*sizeof(Channel *))))
152 {rc = errno; return;}
153
154// Initialize it
155//
156 memset(chnlTab, 0, numfd*sizeof(Channel *));
157 chnlMax = numfd;
158 chnlNum = 1;
159
160// All done
161//
162 rc = 0;
163}
164
165/******************************************************************************/
166/* Protected: B e g i n */
167/******************************************************************************/
168
170 int &retcode,
171 const char **eTxt)
172{
173 int i, num2poll, numpolled;
174
175// Indicate to the starting thread that all went well
176//
177 retcode = 0;
178 *eTxt = 0;
179 syncsem->Post();
180
181// Now start dispatching channels that are ready. We use the wakePend flag to
182// keep the chatter down when we actually wakeup.
183//
184 pollMutex.Lock();
185 do {num2poll = pollNum;
186 pollMutex.UnLock();
187 do {numpolled = poll(pollTab, num2poll, TmoGet());}
188 while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
189 pollMutex.Lock();
190 wakePend = true;
191
192 if (pnewTab)
193 {memcpy(pnewTab, pollTab, pollMax*sizeof(struct pollfd));
194 free(pollTab); pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
195 }
196
197 if (numpolled == 0) CbkTMO();
198 else if (numpolled < 0)
199 {int rc = errno;
200 //--------------------------------------------------------------
201 // If we are in a child process and the poll file descriptor
202 // has been closed, there is an immense chance the fork will be
203 // followed by an exec, in which case we don't want to abort
204 //--------------------------------------------------------------
205 if( rc == EBADF && parentPID != getpid() ) return;
206 std::cerr <<"PPoll: "<<XrdSysE2T(rc)<<" polling for events"<<std::endl;
207 abort();
208 }
209 else{if (pollTab[0].revents) numpolled--;
210 for (i = 1; i < num2poll && numpolled; i++)
211 {if (pollTab[i].revents)
212 {numpolled--;
213 Dispatch(i, pollTab[i].revents);
214 }
215 }
216 if (pollTab[0].revents && !Process()) return;
217 }
218 } while(1);
219}
220
221/******************************************************************************/
222/* Private: D i s p a t c h */
223/******************************************************************************/
224
225void XrdSys::IOEvents::PollPoll::Dispatch(int ptent, int pollEv)
226{
227 static const short pollER = POLLERR| POLLHUP | POLLNVAL;
228 static const short pollOK = POLLIN | POLLRDNORM | POLLPRI | POLLOUT;
229 static const short pollRD = POLLIN | POLLRDNORM | POLLPRI;
230 static const short pollWR = POLLOUT;
231 Channel *cP;
232 const char *eTxt;
233 int eNum, events = 0;
234
235// Check if we really have a channel here
236//
237 if (!(cP = chnlTab[ptent])) {FDRem(ptent); return;}
238
239// Translate the event to something reasonable
240//
241 if (pollEv & pollER)
242 {eTxt = "polling";
243 if (pollEv & POLLHUP) eNum = ECONNRESET;
244 else if (pollEv & POLLERR) eNum = EPIPE;
245 else if (pollEv & POLLNVAL)eNum = EBADF;
246 else eNum = EIO;
247 }
248 else if (pollEv & pollOK)
249 {if (pollEv & pollRD) events |= CallBack::ReadyToRead;
250 if (pollEv & pollWR) events |= CallBack::ReadyToWrite;
251 eNum = 0; eTxt = 0;
252 }
253 else {eTxt = "polling"; eNum = EIO;}
254
255// Execute the callback
256//
257 if (!CbkXeq(cP, events, eNum, eTxt)) FDRem(ptent);
258}
259
260/******************************************************************************/
261/* Protected: E x c l u d e */
262/******************************************************************************/
263
265 bool &isLocked, bool dover)
266{
267 int ctnum;
268
269// Verify that this channel is assigned.
270//
271 ctnum = GetPollEnt(cP);
272 pollMutex.Lock();
273 if (chnlTab[ctnum] != cP) {pollMutex.UnLock(); return;}
274 pollMutex.UnLock();
275
276// If we are the poller thread then we can remove this now. Note that we will
277// still have the poll mutex because the caller would have it as well.
278// Otherwise, send a message to the poller to do this. We will need to release
279// the channel lock to prevent deadlocks. The caller will relock as needed.
280// This message always synchronizes with the poller.
281//
282 if (ISPOLLER)
283 {FDRem(ctnum);
284 return;
285 } else {
286 PipeData cmdbuff((char)PipeData::RmFD,0,(short)ctnum,cP->GetFD());
287 if (isLocked) {isLocked = false; UnLockChannel(cP);}
288 SendCmd(cmdbuff);
289 }
290}
291
292/******************************************************************************/
293/* Private: F D M o d */
294/******************************************************************************/
295
296// pollMutex must be locked
297//
298void XrdSys::IOEvents::PollPoll::FDMod(int ptnum, int fd, int events)
299{
300 XrdSysMutexHelper mHelper(pollMutex);
301
302// First step is to see if we need to swap to a new poll table
303//
304 if (pnewTab)
305 {memcpy(pnewTab, pollTab, pollMax*sizeof(struct pollfd));
306 free(pollTab);
307 pollTab = pnewTab; pnewTab = 0; pollMax = chnlMax;
308 }
309
310
311// Initialize poll table entry
312//
313 pollTab[ptnum].fd = fd;
314 pollTab[ptnum].events = 0;
315 pollTab[ptnum].revents = 0;
316 if (events & Channel:: readEvents)
317 pollTab[ptnum].events = POLLIN | POLLRDNORM;
318 if (events & Channel::writeEvents)
319 pollTab[ptnum].events |= POLLOUT;
320 if (!pollTab[ptnum].events && !(events & Channel::errorEvents))
321 pollTab[ptnum].fd |= disFD;
322
323// Reset poll marker, as needed
324//
325 if (chnlNum >= pollNum) pollNum = chnlNum+1;
326}
327
328/******************************************************************************/
329/* Private: F D R e m */
330/******************************************************************************/
331
332// pollMutex must be locked
333//
334void XrdSys::IOEvents::PollPoll::FDRem(int ptnum)
335{
336 int ctnum = ptnum;
337
338// Free up the channel
339//
340 chnlTab[ctnum] = 0;
341
342// See if we need to adjust the channel count
343//
344 if (ctnum == chnlNum-1)
345 {while(ctnum > 0 && !chnlTab[ctnum]) ctnum--;
346 chnlNum = ctnum+1;
347 }
348
349// Free up this entry
350//
351 pollTab[ptnum].fd = -1;
352 pollTab[ptnum].events = 0;
353 pollTab[ptnum].revents = 0;
354
355// Now see if we need to adjust our poll count
356//
357 if (ptnum == pollNum-1)
358 {while(ptnum > 0 && pollTab[ptnum].fd == -1) ptnum--;
359 pollNum = ptnum+1;
360 }
361}
362
363/******************************************************************************/
364/* I n c l u d e */
365/******************************************************************************/
366
368 int &eNum,
369 const char **eTxt,
370 bool &isLocked)
371{
372 static const int incVal = 256;
373 static const int cpSz = sizeof(Channel *);
374 static const int ptSz = sizeof(struct pollfd);
375 int fd, ctnum;
376
377// Validate the file descriptor
378//
379 fd = cP->GetFD();
380 if (fd & 0xffff0000)
381 {eNum = EBADF;
382 if (eTxt) *eTxt = "adding channel";
383 return false;
384 }
385
386// Make sure this channel is not already assigned to this poller
387//
388 if (GetPollEnt(cP))
389 {eNum = EEXIST;
390 if (eTxt) *eTxt = "adding channel";
391 return false;
392 }
393
394// Get the next channel table entry to be used
395//
396 pollMutex.Lock();
397 ctnum = 1;
398 while((ctnum < chnlMax) && (chnlTab[ctnum] != 0)) ctnum++;
399
400// Reallocate channel table if we don't have enough space. We also pre-allocate
401// a new poll table so that we can reflect failure to the caller as the poller
402// can't do that. The poller will swap the new one for the old one.
403//
404 if (ctnum >= chnlMax)
405 {Channel **cnewTab = (Channel **)realloc(chnlTab,(chnlMax+incVal)*cpSz);
406 if (pnewTab) free(pnewTab);
407 pnewTab = (struct pollfd *)malloc((chnlMax+incVal)*ptSz);
408 if (!cnewTab || !pnewTab)
409 {pollMutex.UnLock();
410 eNum = ENOMEM;
411 if (eTxt) *eTxt = "adding channel";
412 if (cnewTab) free(cnewTab);
413 if (pnewTab) free(pnewTab);
414 return false;
415 }
416 memset(&cnewTab[ctnum], 0, incVal*cpSz);
417 memset(&pnewTab[ctnum],-1, incVal*ptSz);
418 chnlTab = cnewTab; chnlMax += incVal; chnlNum = ctnum+1;
419 } else if (ctnum > chnlNum) chnlNum = ctnum;
420
421// Record the poll table entry in the channel
422//
423 chnlTab[ctnum] = cP;
424 SetPollEnt(cP, ctnum);
425 pollMutex.UnLock();
426
427// If we are the poller thread, then enable the poll entry in-line. Note that
428// we will still be holding the poll mutex because the caller also has it.
429// Otherwise, send a message to the poller to do this. We will need to release
430// the channel lock to prevent deadlocks. The caller will relock as needed.
431//
432 if (ISPOLLER)
433 {FDMod(ctnum, fd, cP->GetEvents());
434 return true;
435 } else {
436 PipeData cmdbuff((char)PipeData::MiFD, (char)cP->GetEvents(),
437 (short)ctnum, fd, 0);
438 if (isLocked) {isLocked = false; UnLockChannel(cP);}
439 SendCmd(cmdbuff);
440 }
441
442// All done
443//
444 return true;
445}
446
447/******************************************************************************/
448/* Protected: M o d i f y */
449/******************************************************************************/
450
452 int &eNum,
453 const char **eTxt,
454 bool &isLocked)
455{
456
457// If we are the poller thread, then modify the poll entry in-line. Otherwise,
458// send a modification message to the poller. This requires that we unlock the
459// channel to prevent any deadlocks. The caller will relock it as needed.
460//
461 if (ISPOLLER)
462 {FDMod(GetPollEnt(cP), cP->GetFD(), cP->GetEvents());
463 return true;
464 } else {
465 PipeData cmdbuff((char)PipeData::MdFD, (char)cP->GetEvents(),
466 (short)GetPollEnt(cP), cP->GetFD(), 0);
467 if (isLocked) {isLocked = false; UnLockChannel(cP);}
468 SendCmd(cmdbuff);
469 }
470
471// All done
472//
473 return true;
474}
475
476/******************************************************************************/
477/* Private: P r o c e s s */
478/******************************************************************************/
479
480bool XrdSys::IOEvents::PollPoll::Process()
481{
482// Get the pipe request and check out actions of interest.
483//
484 while(GetRequest())
485 {switch(reqBuff.req)
486 {case PipeData::MdFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
487 break;
488 case PipeData::MiFD: FDMod(reqBuff.ent, reqBuff.fd, reqBuff.evt);
489 reqBuff.theSem->Post();
490 break;
491 case PipeData::RmFD: FDRem(reqBuff.ent);
492 reqBuff.theSem->Post();
493 break;
494 case PipeData::NoOp: break;
495 case PipeData::Post: reqBuff.theSem->Post();
496 break;
497 case PipeData::Stop: reqBuff.theSem->Post();
498 return false;
499 break;
500 default: break;
501 }
502 }
503
504// Return true
505//
506 return true;
507}
508
509/******************************************************************************/
510/* Protected: S h u t d o w n */
511/******************************************************************************/
512
514{
515 static XrdSysMutex shutMutex;
516
517// To avoid race conditions, we serialize this code
518//
519 shutMutex.Lock();
520
521// Release the appendages
522//
523 if (pollTab) {free(pollTab); pollTab = 0;}
524 if (pnewTab) {free(pnewTab); pnewTab = 0;}
525 if (chnlTab) {free(chnlTab); chnlTab = 0;}
526
527// All done
528//
529 shutMutex.UnLock();
530}
#define eMsg(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define ISPOLLER
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ errorEvents
Error event non-r/w specific.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eMsg)
PollPoll(int &rc, int numfd, int pFD[2])