libzypp 17.34.0
asyncdatasource.cpp
Go to the documentation of this file.
2
4#include <zypp-core/zyppng/base/AutoDisconnect>
5#include <zypp-core/zyppng/base/EventDispatcher>
7
8namespace zyppng {
9
11 {
12 if ( _writeNotifier.get() == &notify ) {
14 DBG << "Closing due to error when polling" << std::endl;
16 return;
17 }
18 readyWrite();
19 } else {
20
21 auto dev = std::find_if( _readFds.begin(), _readFds.end(),
22 [ &notify ]( const auto &dev ){ return ( dev._readNotifier.get() == &notify ); } );
23
24 if ( dev == _readFds.end() ) {
25 return;
26 }
27
28 readyRead( std::distance( _readFds.begin(), dev ) );
29 }
30 }
31
33 {
34 auto bytesToRead = z_func()->rawBytesAvailable( channel );
35 if ( bytesToRead == 0 ) {
36 // make sure to check if bytes are available even if the ioctl call returns something different
37 bytesToRead = 4096;
38 }
39
40 auto &_readBuf = _readChannels[channel];
41 char *buf = _readBuf.reserve( bytesToRead );
42 const auto bytesRead = z_func()->readData( channel, buf, bytesToRead );
43
44 if ( bytesRead <= 0 ) {
45 _readBuf.chop( bytesToRead );
46
47 switch( bytesRead ) {
48 // remote close , close the read channel
49 case 0: {
51 break;
52 }
53 // no data is available , just try again later
54 case -2: break;
55 // anything else
56 default:
57 case -1: {
59 break;
60 }
61 }
62 return;
63 }
64
65 if ( bytesToRead > bytesRead )
67
68 if ( channel == _currentReadChannel )
69 _readyRead.emit();
70
71 _channelReadyRead.emit( channel );
72 return;
73 }
74
76 {
77 const auto nwrite = _writeBuffer.frontSize();
78 if ( !nwrite ) {
79 // disable Write notifications so we do not wake up without the need to
80 _writeNotifier->setEnabled( false );
81 return;
82 }
83
84 const auto nBuf = _writeBuffer.front();
85 const auto written = eintrSafeCall( ::write, _writeFd, nBuf, nwrite );
86 if ( written == -1 ) {
87 switch ( errno ) {
88 case EACCES:
90 return;
91 case EAGAIN:
92#if EAGAIN != EWOULDBLOCK
93 case EWOULDBLOCK:
94#endif
95 return;
96 case EPIPE:
97 case ECONNRESET:
99 return;
100 default:
102 return;
103 }
104 return;
105 }
108
109 if ( _writeBuffer.size() == 0 )
110 _sigAllBytesWritten.emit();
111 }
112
123
125 {
126 auto &readFd = _readFds[channel];
127 // we do not clear the read buffer so code has the opportunity to read whats left in there
128 bool sig = readFd._readFd >= 0;
129 readFd._readNotifier.reset();
130 readFd._readFd = -1;
131 if ( sig )
132 _sigReadFdClosed.emit( channel, reason );
133 }
134
136
139
143
145 {
146 return std::shared_ptr<AsyncDataSource>( new AsyncDataSource );
147 }
148
149
150 bool AsyncDataSource::openFds ( const std::vector<int>& readFds, int writeFd )
151 {
152 Z_D();
153
154 if ( d->_mode != IODevice::Closed )
155 return false;
156
157 IODevice::OpenMode mode;
158
159 bool error = false;
160 for ( const auto readFd : readFds ) {
161 if ( readFd >= 0 ) {
162 mode |= IODevice::ReadOnly;
163 d->_readFds.push_back( {
164 readFd,
166 });
168 ERR << "Failed to set read FD to non blocking" << std::endl;
169 error = true;
170 break;
171 }
172 d->_readFds.back()._readNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
173 }
174 }
175
176 if ( writeFd >= 0 && !error ) {
177 mode |= IODevice::WriteOnly;
179 ERR << "Failed to set write FD to non blocking" << std::endl;
180 error = true;
181 } else {
182 d->_writeFd = writeFd;
183 d->_writeNotifier = SocketNotifier::create( writeFd, SocketNotifier::Write | AbstractEventSource::Error, false );
185 }
186 }
187
188 if( error || !IODevice::open( mode ) ) {
189 d->_mode = IODevice::Closed;
190 d->_readFds.clear();
191 d->_writeNotifier.reset();
192 d->_writeFd = -1;
193 return false;
194 }
195
196 // make sure we have enough read buffers
197 setReadChannelCount( d->_readFds.size() );
198 return true;
199 }
200
201 int64_t zyppng::AsyncDataSource::writeData( const char *data, int64_t count )
202 {
203 Z_D();
204 if ( count > 0 ) {
205 // we always use the write buffer, to make sure the fd is actually writeable
206 d->_writeBuffer.append( data, count );
207 d->_writeNotifier->setEnabled( true );
208 }
209 return count;
210 }
211
212 int64_t zyppng::AsyncDataSource::readData( uint channel, char *buffer, int64_t bufsize )
213 {
214 Z_D();
215 if ( channel >= d->_readFds.size() ) {
216 ERR << constants::outOfRangeErrMsg << std::endl;
217 throw std::logic_error( constants::outOfRangeErrMsg.data() );
218 }
219 const auto read = eintrSafeCall( ::read, d->_readFds[channel]._readFd, buffer, bufsize );
220 if ( read < 0 ) {
221 switch ( errno ) {
222 #if EAGAIN != EWOULDBLOCK
223 case EWOULDBLOCK:
224 #endif
225 case EAGAIN: {
226 return -2;
227 }
228 default:
229 break;
230 }
231 }
232 return read;
233 }
234
235 int64_t AsyncDataSource::rawBytesAvailable( uint channel ) const
236 {
237 Z_D();
238
239 if ( channel >= d->_readFds.size() ) {
240 ERR << constants::outOfRangeErrMsg << std::endl;
241 throw std::logic_error( constants::outOfRangeErrMsg.data() );
242 }
243
244 if ( isOpen() && canRead() )
245 return zyppng::bytesAvailableOnFD( d->_readFds[channel]._readFd );
246 return 0;
247 }
248
250 {
251 Z_D();
252 if ( channel >= d->_readFds.size() ) {
253 ERR << constants::outOfRangeErrMsg << std::endl;
254 throw std::logic_error( constants::outOfRangeErrMsg.data() );
255 }
256 }
257
259 {
260 Z_D();
261 for( uint i = 0; i < d->_readFds.size(); ++i ) {
262 auto &readChan = d->_readFds[i];
263 readChan._readNotifier.reset();
264 if ( readChan._readFd >= 0)
265 d->_sigReadFdClosed.emit( i, UserRequest );
266 }
267 d->_readFds.clear();
268
269 d->_writeNotifier.reset();
270 d->_writeBuffer.clear();
271 if ( d->_writeFd >= 0 ) {
272 d->_writeFd = -1;
273 d->_sigWriteFdClosed.emit( UserRequest );
274 }
275
277 }
278
280 {
281 Z_D();
282
283 // if we are open writeOnly, simply call close();
284 if ( !canRead() ) {
285 close();
286 return;
287 }
288
289 d->_mode = ReadOnly;
290 d->_writeNotifier.reset();
291 d->_writeBuffer.clear();
292
293 if ( d->_writeFd >= 0 ) {
294 d->_writeFd = -1;
295 d->_sigWriteFdClosed.emit( UserRequest );
296 }
297 }
298
299 bool AsyncDataSource::waitForReadyRead( uint channel, int timeout )
300 {
301 Z_D();
302 if ( !canRead() )
303 return false;
304
305 if ( channel >= d->_readFds.size() ) {
306 ERR << constants::outOfRangeErrMsg << std::endl;
307 throw std::logic_error( constants::outOfRangeErrMsg.data() );
308 }
309
310 bool gotRR = false;
311 auto rrConn = AutoDisconnect( d->_channelReadyRead.connect([&]( uint activated ){
312 gotRR = ( channel == activated );
313 }) );
314
315 // we can only wait if we are open for reading and still have a valid fd
316 auto &channelRef = d->_readFds[ channel ];
317 while ( readFdOpen(channel) && canRead() && !gotRR ) {
318 int rEvents = 0;
320 //simulate signal from read notifier
321 d->notifierActivated( *channelRef._readNotifier, rEvents );
322 } else {
323 //timeout
324 return false;
325 }
326 }
327 return gotRR;
328 }
329
331 {
332 Z_D();
333 if ( !canWrite() )
334 return;
335
336 int timeout = -1;
337 while ( canWrite() && d->_writeBuffer.frontSize() ) {
338 int rEvents = 0;
340 //simulate signal from write notifier
341 d->readyWrite();
342 } else {
343 //timeout
344 return;
345 }
346 }
347 }
348
353
355 {
356 return d_func()->_sigReadFdClosed;
357 }
358
360 {
361 Z_D();
362 if ( !d->_readChannels.size() )
363 return false;
364 return readFdOpen( d_func()->_currentReadChannel );
365 }
366
367 bool AsyncDataSource::readFdOpen(uint channel) const
368 {
369 Z_D();
370 if ( channel >= d->_readFds.size() ) {
371 ERR << constants::outOfRangeErrMsg << std::endl;
372 throw std::logic_error( constants::outOfRangeErrMsg.data() );
373 }
374 auto &channelRef = d->_readFds[ channel ];
375 return ( channelRef._readNotifier && channelRef._readFd >= 0 );
376 }
377
378}
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition AutoDispose.h:95
void reset()
Reset to default Ctor values.
void closeReadChannel(uint channel, AsyncDataSource::ChannelCloseReason reason)
void closeWriteChannel(AsyncDataSource::ChannelCloseReason reason)
void notifierActivated(const SocketNotifier &notify, int evTypes)
Signal< void(uint, AsyncDataSource::ChannelCloseReason)> _sigReadFdClosed
std::vector< ReadChannelDev > _readFds
Signal< void(AsyncDataSource::ChannelCloseReason)> _sigWriteFdClosed
int64_t writeData(const char *data, int64_t count) override
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
void readChannelChanged(uint channel) override
bool waitForReadyRead(uint channel, int timeout) override
bool openFds(const std::vector< int > &readFds, int writeFd=-1)
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
std::shared_ptr< AsyncDataSource > Ptr
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
int64_t rawBytesAvailable(uint channel) const override
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
int64_t size() const
Definition iobuffer.cc:154
char * front()
Definition iobuffer.cc:35
int64_t discard(int64_t bytes)
Definition iobuffer.cc:55
int64_t frontSize() const
Definition iobuffer.cc:43
Signal< void()> _readyRead
Definition iodevice_p.h:44
Signal< void()> _sigAllBytesWritten
Definition iodevice_p.h:47
Signal< void(int64_t)> _sigBytesWritten
Definition iodevice_p.h:46
std::vector< IOBuffer > _readChannels
Definition iodevice_p.h:39
IODevice::OpenMode _mode
Definition iodevice_p.h:43
Signal< void(uint)> _channelReadyRead
Definition iodevice_p.h:45
void setReadChannelCount(uint channels)
Definition iodevice.cc:37
bool canWrite() const
Definition iodevice.cc:78
virtual void close()
Definition iodevice.cc:30
bool canRead() const
Definition iodevice.cc:73
bool isOpen() const
Definition iodevice.cc:83
virtual bool open(const OpenMode mode)
Definition iodevice.cc:16
SignalProxy< void(const SocketNotifier &sock, int evTypes) sigActivated)()
static Ptr create(int socket, int evTypes, bool enable=true)
@ FailedToSetMode
Failed to block or unblock the fd.
BlockingMode setFDBlocking(int fd, bool mode)
Definition IOTools.cc:31
constexpr std::string_view outOfRangeErrMsg("Channel index out of range")
auto eintrSafeCall(Fun &&function, Args &&... args)
int64_t bytesAvailableOnFD(int fd)
#define DBG
Definition Logger.h:97
#define ERR
Definition Logger.h:100
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:91
#define Z_D()
Definition zyppglobal.h:104