libzypp  17.31.31
providequeue.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "private/providequeue_p.h"
11 #include "private/provideitem_p.h"
12 #include "private/provide_p.h"
14 #include "private/providedbg_p.h"
15 
16 #include <zypp-core/fs/PathInfo.h>
17 #include <zypp-core/zyppng/rpc/MessageStream>
18 #include <zypp-core/base/StringV.h>
20 #include <zypp-media/MediaException>
21 #include <zypp-media/auth/CredentialManager>
22 
23 #include <zypp/APIConfig.h>
24 #include <variant>
25 #include <bitset>
26 
27 namespace zyppng {
28 
30  {
31  return ( _request->code () == ProvideMessage::Code::Attach );
32  }
33 
35  {
36  return ( _request->code () == ProvideMessage::Code::Provide );
37  }
38 
40  {
41  return ( _request->code () == ProvideMessage::Code::Detach );
42  }
43 
45  { }
46 
48  {
50  if ( this->_activeItems.size() || this->_waitQueue.size() ) {
51  DBG << "Queue shutdown with Items still running" << std::endl;
52  }
53  }
54  immediateShutdown(std::make_exception_ptr(zypp::media::MediaException("Cancelled by queue shutdown")));
55  }
56 
57  bool ProvideQueue::startup(const std::string &workerScheme, const zypp::filesystem::Pathname &workDir, const std::string &hostname ) {
58 
59  if ( _workerProc ) {
60  ERR << "Queue Worker was already initialized" << std::endl;
61  return true;
62  }
63 
65 
66  const auto &pN = _parent.workerPath() / ( "zypp-media-"+workerScheme ) ;
67  MIL << "Trying to start " << pN << std::endl;
68  const auto &pi = zypp::PathInfo( pN );
69  if ( !pi.isExist() ) {
70  ERR << "Failed to find worker for " << workerScheme << std::endl;
71  return false;
72  }
73 
74  if ( !pi.userMayX() ) {
75  ERR << "Failed to start worker for " << workerScheme << " binary " << pi.asString() << " is not executable." << std::endl;
76  return false;
77  }
78 
79  if ( zypp::filesystem::assert_dir( workDir ) != 0 ) {
80  ERR << "Failed to assert working directory '" << workDir << "' for worker " << workerScheme << std::endl;
81  return false;
82  }
83 
84  _currentExe = pN;
85  _workerProc = Process::create();
86  _workerProc->setWorkingDirectory ( workDir );
87  _messageStream = RpcMessageStream::create( _workerProc );
88  return doStartup();
89  }
90 
91 
92  void ProvideQueue::enqueue( ProvideRequestRef request )
93  {
94  Item i;
95  i._request = request;
96  i._request->provideMessage().setRequestId( nextRequestId() );
97  request->setCurrentQueue( shared_this<ProvideQueue>() );
98  _waitQueue.push_back( std::move(i) );
99  if ( _parent.isRunning() )
100  scheduleNext();
101  }
102 
103  void ProvideQueue::cancel( ProvideRequest *item , std::exception_ptr error )
104  {
105  const auto &isSameItem = [item]( const Item &i ){
106  if ( i.isDetachRequest () )
107  return false;
108  return i._request.get() == item;
109  };
110 
111  if ( !item )
112  return;
113 
114  if ( item->code() != ProvideMessage::Code::Attach
115  && item->code() != ProvideMessage::Code::Provide ) {
116  ERR << "Can not cancel a " << item->code() << " request!" << std::endl;
117  return;
118  }
119 
120  if ( auto i = std::find_if( _waitQueue.begin(), _waitQueue.end(), isSameItem ); i != _waitQueue.end() ) {
121  auto &reqRef = i->_request;
122  reqRef->setCurrentQueue(nullptr);
123  if ( reqRef->owner() )
124  reqRef->owner()->finishReq( this, reqRef, error );
125  _waitQueue.erase(i);
126  _parent.schedule( ProvidePrivate::FinishReq ); // let the parent scheduler run since we have a open spot now
127  } else if ( auto i = std::find_if( _activeItems.begin(), _activeItems.end(), isSameItem ); i != _activeItems.end() ) {
128  cancelActiveItem(i, error);
129  }
130  }
131 
132  std::list<ProvideQueue::Item>::iterator ProvideQueue::dequeueActive( std::list<Item>::iterator it )
133  {
134  if ( it == _activeItems.end() )
135  return it;
136 
137  if ( it->_request )
138  it->_request->setCurrentQueue( nullptr );
139 
140  auto i = _activeItems.erase(it);
141  _parent.schedule ( ProvidePrivate::FinishReq ); // Trigger the scheduler
142  scheduleNext (); // keep the active items full
143  return i;
144  }
145 
146  void ProvideQueue::fatalWorkerError( const std::exception_ptr &reason )
147  {
148  immediateShutdown( reason ? reason : std::make_exception_ptr( zypp::media::MediaException("Fatal worker error")) );
149  }
150 
151  void ProvideQueue::immediateShutdown( const std::exception_ptr &reason )
152  {
153  _queueShuttingDown = true;
154 
155  while ( _waitQueue.size() ) {
156  auto &item = _waitQueue.front();
157  auto &reqRef = item._request;
158  if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
159  reqRef->owner()->finishReq( this, reqRef, reason );
160  _waitQueue.pop_front();
161  }
162 
163  for ( auto i = _activeItems.begin(); i != _activeItems.end(); ) {
164  auto &reqRef = i->_request;
165  if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
166  i = cancelActiveItem(i, reason );
167  } else {
168  i++;
169  }
170  }
171 
172  if ( _workerProc && _workerProc->isRunning() ) {
173  _workerProc->flush();
174  _workerProc->closeWriteChannel();
175  _workerProc->waitForExit();
176  readAllStderr();
177  }
178  }
179 
180  std::list< ProvideQueue::Item >::iterator ProvideQueue::cancelActiveItem( std::list< Item >::iterator i , const std::__exception_ptr::exception_ptr &error )
181  {
182  auto &reqRef = i->_request;
183 
184  // already in cancelling process or finished
185  if ( i->_state == Item::Cancelling || i->_state == Item::Finished )
186  return (++i);
187 
188  // not possible but lets be safe
189  if ( i->_state == Item::Pending ) {
190  reqRef->setCurrentQueue(nullptr);
191  if ( reqRef->owner() )
192  reqRef->owner()->finishReq( this, reqRef, error );
193  return dequeueActive(i);
194  }
195 
196  // we first need to cancel the item
197  auto c = ProvideMessage::createCancel ( i->_request->provideMessage().requestId() );
198  if( !_messageStream->sendMessage(c.impl()) )
199  ERR << "Failed to send cancel message to worker" << std::endl;
200 
201  i->_state = Item::Cancelling;
202  reqRef->setCurrentQueue(nullptr);
203  if ( reqRef->owner() )
204  reqRef->owner()->finishReq( this, reqRef, error );
205  reqRef.reset();
206  return (++i);
207  }
208 
210  {
211  if ( _queueShuttingDown )
212  return;
213 
214  while ( _waitQueue.size() && canScheduleMore() ) {
215  auto item = std::move( _waitQueue.front() );
216  _waitQueue.pop_front();
217 
218  auto &reqRef = item._request;
219  if ( !reqRef->activeUrl() ) {
220  ERR << "Item without active URL enqueued, this is a BUG." << std::endl;
221  if ( reqRef->owner() )
222  reqRef->owner()->finishReq( this, reqRef, ZYPP_EXCPT_PTR (zypp::media::MediaException("Item needs a activeURL to be queued.")) );
223  continue;
224  }
225 
226  if ( !_messageStream->sendMessage( reqRef->provideMessage().impl() ) ) {
227  ERR << "Failed to send message to worker process." << std::endl;
228  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
229  return;
230  }
231 
232  item._state = Item::Queued;
233  _activeItems.push_back( std::move(item) );
234  _idleSince.reset();
235  }
236 
237  if ( _waitQueue.empty() && _activeItems.empty() ) {
239  if ( !_idleSince )
240  _idleSince = std::chrono::steady_clock::now();
241  _sigIdle.emit();
242  }
243  }
244 
246  {
247  return ( _activeItems.size() == 0 || ( _capabilities.cfg_flags () & zypp::proto::Capabilities::Pipeline ) == zypp::proto::Capabilities::Pipeline );
248  }
249 
250  bool ProvideQueue::isIdle() const
251  {
252  return ( empty() );
253  }
254 
255  std::optional<ProvideQueue::TimePoint> ProvideQueue::idleSince() const
256  {
257  return _idleSince;
258  }
259 
260  bool ProvideQueue::empty() const
261  {
262  return ( _activeItems.empty() && _waitQueue.empty() );
263  }
264 
266  {
267  return _activeItems.size() + _waitQueue.size();
268  }
269 
271  {
272  return _activeItems.size();
273  }
274 
276  {
277  zypp::ByteCount dlSize;
278  for ( const auto &i : _waitQueue ) {
279  if ( i.isDetachRequest () )
280  continue;
281 
282  auto &reqRef = i._request;
283  if ( reqRef->code() != ProvideMessage::Code::Provide )
284  continue;
285  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
286  }
287  for ( const auto &i : _activeItems ) {
288  if ( i.isDetachRequest () )
289  continue;
290  auto &reqRef = i._request;
291  if ( reqRef->code() != ProvideMessage::Code::Provide )
292  continue;
293  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
294  }
295  return dlSize;
296  }
297 
298  const std::string &ProvideQueue::hostname() const
299  {
300  return _myHostname;
301  }
302 
304  {
305  return _capabilities;
306  }
307 
308  SignalProxy<void ()> ProvideQueue::sigIdle()
309  {
310  return _sigIdle;
311  }
312 
314  {
315  if ( _currentExe.empty() )
316  return false;
317 
318  //const char *argv[] = { "gdbserver", ":10000", _currentExe.c_str(), nullptr };
319  const char *argv[] = { _currentExe.c_str(), nullptr };
320  if ( !_workerProc->start( argv) ) {
321  ERR << "Failed to execute worker" << std::endl;
322 
323  _messageStream.reset ();
324  _workerProc.reset ();
325 
326  return false;
327  }
328 
329  // make sure the default read channel is StdOut so RpcMessageStream gets all the rpc messages
330  _workerProc->setReadChannel ( Process::StdOut );
331 
332  // we are ready to send the data
333 
335  // @TODO actually write real config data :D
336  conf.mutable_values ()->insert ( { AGENT_STRING_CONF.data (), "ZYpp " LIBZYPP_VERSION_STRING } );
337  conf.mutable_values ()->insert ( { ATTACH_POINT.data (), _workerProc->workingDirectory().asString() } );
338  conf.mutable_values ()->insert ( { PROVIDER_ROOT.data (), _parent.z_func()->providerWorkdir().asString() } );
339 
340  const auto &cleanupOnErr = [&](){
341  readAllStderr();
342  _messageStream.reset ();
343  _workerProc->close();
344  _workerProc.reset();
345  return false;
346  };
347 
348  if ( !_messageStream->sendMessage( conf ) ) {
349  ERR << "Failed to send initial message to queue worker" << std::endl;
350  return cleanupOnErr();
351  }
352 
353  // wait for the data to be written
354  _workerProc->flush ();
355 
356  // wait until we receive a message
357  const auto &caps = _messageStream->nextMessageWait();
358  if ( !caps || caps->messagetypename() != rpc::messageTypeName<zypp::proto::Capabilities>() ) {
359  ERR << "Worker did not sent a capabilities message, aborting" << std::endl;
360  return cleanupOnErr();
361  }
362 
363  {
364  auto p = _messageStream->parseMessage<zypp::proto::Capabilities>( *caps );
365  if ( !p )
366  return cleanupOnErr();
367 
368  _capabilities = std::move(*p);
369  }
370 
371  DBG << "Received config for worker: " << this->_currentExe.asString() << " Worker Type: " << this->_capabilities.worker_type() << " Flags: " << std::bitset<32>( _capabilities.cfg_flags() ).to_string() << std::endl;
372 
373  // now we can set up signals and start processing messages
374  connect( *_messageStream, &RpcMessageStream::sigMessageReceived, *this, &ProvideQueue::processMessage );
375  connect( *_workerProc, &IODevice::sigChannelReadyRead, *this, &ProvideQueue::processReadyRead );
376  connect( *_workerProc, &Process::sigFinished, *this, &ProvideQueue::procFinished );
377 
378  // make sure we do not miss messages
379  processMessage();
380  return true;
381  }
382 
384 
385  const auto &getRequest = [&]( const auto &exp ) -> decltype(_activeItems)::iterator {
386  if ( !exp ) {
387  ERR << "Ignoring invalid request!" << std::endl;
388  return _activeItems.end();
389  }
390 
391  auto i = std::find_if( _activeItems.begin(), _activeItems.end(), [&]( const auto &elem ) {
392  return exp->requestId() == elem._request->provideMessage().requestId();
393  });
394 
395  if ( i == _activeItems.end() ) {
396  ERR << "Ignoring unknown request ID: " << exp->requestId() << std::endl;
397  return _activeItems.end();
398  }
399 
400  return i;
401  };
402 
403  const auto &sendErrorToWorker = [&]( const uint32_t reqId, const uint code, const std::string &reason, bool transient = false ) {
404  auto r = ProvideMessage::createErrorResponse ( reqId, code, reason, transient );
405  if ( !_messageStream->sendMessage( r.impl() ) ) {
406  ERR << "Failed to send Error message to worker process." << std::endl;
407  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
408  return false;
409  }
410  return true;
411  };
412 
413  const bool doesDownload = this->_capabilities.worker_type() == Config::Downloading;
414  const bool fileNeedsCleanup = doesDownload || ( _capabilities.worker_type() == Config::CPUBound && _capabilities.cfg_flags() & Config::FileArtifacts );
415 
416  while ( auto msg = _messageStream->nextMessage () ) {
417 
418  if ( msg->messagetypename() == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
419 
420  const auto &provMsg = ProvideMessage::create(*msg);
421  if ( !provMsg ) {
422  fatalWorkerError( provMsg.error() );
423  return;
424  }
425 
426  const auto &reqIter = getRequest( provMsg );
427  if ( reqIter == _activeItems.end() ) {
428  if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
429  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
430  if ( !_parent.isInCache(locFName) ) {
431  MIL << "Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
432  zypp::filesystem::unlink( locFName );
433  }
434  }
435  continue;
436  }
437 
438  auto &req = *reqIter;
439  auto &reqRef =req._request;
440 
441  const auto code = provMsg->code();
442 
443  if ( code >= ProvideMessage::Code::FirstInformalCode && code <= ProvideMessage::Code::LastInformalCode ) {
444 
445  // send the message to the item but don't dequeue
446  if ( reqRef && reqRef->owner() )
447  reqRef->owner()->informalMessage ( *this, reqRef, *provMsg );
448  continue;
449 
450  } else if ( code >= ProvideMessage::Code::FirstSuccessCode && code <= ProvideMessage::Code::LastSuccessCode ) {
451 
452  if ( req._state == Item::Cancelling ) {
453  req._state = Item::Finished;
454  dequeueActive( reqIter );
455  continue;
456  }
457 
458  if ( code == ProvideMessage::Code::ProvideFinished ) {
459 
460  // we are going to register the file to the cache if this is a downloading worker, so it can not leak
461  // no matter if the item does the correct dance or not, this code is duplicated by all ProvideItems that receive ProvideFinished
462  // results that require file cleanups.
463  // we keep the ref around until after sending the result to the item. At that point it should take a reference
464  std::optional<zypp::ManagedFile> dataRef;
465 
466  if ( !reqIter->isFileRequest() ) {
467  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
469  return;
470  }
471 
472  // when a worker is downloading we keep a internal book of cache files
473  if ( doesDownload ) {
474  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
475  if ( provMsg->value( ProvideFinishedMsgFields::CacheHit, false ).asBool()) {
476  dataRef = _parent.addToFileCache ( locFName );
477  if ( !dataRef ) {
478  MIL << "CACHE MISS, file " << locFName << " was already removed, queueing again" << std::endl;
479  if ( reqRef->owner() )
480  reqRef->owner()->cacheMiss( reqRef );
481  reqRef->provideMessage().setRequestId( InvalidId );
482  req._state = Item::Pending;
483  _waitQueue.push_front( req );
484  dequeueActive( reqIter );
485  continue;
486  }
487  } else {
488  dataRef = _parent.addToFileCache ( locFName );
489 
490  // unlikely this can happen but better be safe than sorry
491  if ( !dataRef ) {
492  req._state = Item::Finished;
493  reqRef->setCurrentQueue(nullptr);
494  auto resp = ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError, "File vanished between downloading and adding it to cache." );
495  if ( reqRef->owner() )
496  reqRef->owner()->finishReq( *this, reqRef, resp );
497  dequeueActive( reqIter );
498  continue;
499  }
500  }
501  }
502  }
503 
504  // send the message to the item and dequeue
505  reqRef->setCurrentQueue(nullptr);
506  if ( reqRef->owner() )
507  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
508  req._state = Item::Finished;
509  dequeueActive( reqIter );
510  continue;
511 
512  } else if ( code >= ProvideMessage::Code::FirstClientErrCode && code <= ProvideMessage::Code::LastSrvErrCode ) {
513 
514  if ( req._state == Item::Cancelling ) {
515  req._state = Item::Finished;
516  dequeueActive( reqIter );
517  continue;
518  }
519 
520  // send the message to the item and dequeue
521  reqRef->setCurrentQueue(nullptr);
522 
523  if ( reqRef->owner() )
524  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
525 
526  req._state = Item::Finished;
527  dequeueActive( reqIter );
528  continue;
529 
530  } else if ( code >= ProvideMessage::Code::FirstRedirCode && code <= ProvideMessage::Code::LastRedirCode ) {
531 
532  // redir is like a finished message, we can simply forgot about a cancelling request
533  if ( req._state == Item::Cancelling ) {
534  req._state = Item::Finished;
535  dequeueActive( reqIter );
536  continue;
537  }
538 
539  // send the message to the item and dequeue
540  reqRef->setCurrentQueue(nullptr);
541  if ( reqRef->owner() )
542  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
543  req._state = Item::Finished;
544  dequeueActive( reqIter );
545  continue;
546 
547  } else if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
548 
549  ERR << "Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
550  fatalWorkerError ( ZYPP_EXCPT_PTR( zypp::media::MediaException("Controller message received from worker.") ) );
551  return;
552 
553  } else if ( code >= ProvideMessage::Code::FirstWorkerCode && code <= ProvideMessage::Code::LastWorkerCode ) {
554 
555  if ( code == ProvideMessage::Code::AuthDataRequest ) {
556  if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
557  ERR << "Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
559  return;
560  }
561 
562  // if the file was cancelled we send a failure back
563  if( reqIter->_state == Item::Cancelling ) {
564  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item was cancelled") )
565  return;
566  continue;
567  }
568 
569  // we need a owner item to fetch the auth data for us
570  if ( !reqRef->owner() ) {
571  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Request has no owner" ) )
572  return;
573  continue;
574  }
575 
576  if ( !reqRef->activeUrl() ) {
577  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item has no active URL, this is a bug." ) )
578  return;
579  continue;
580  }
581 
582  try {
583  zypp::Url u( provMsg->value( AuthDataRequestMsgFields::EffectiveUrl ).asString() );
584 
585  std::map<std::string, std::string> extraVals;
586  provMsg->forEachVal( [&]( const std::string &name, const zyppng::ProvideMessage::FieldVal &val ) {
587 
590  return true;
591 
592  if ( !val.isString() ) {
593  WAR << "Ignoring non string value for " << name << std::endl;
594  return true;
595  }
596 
597  extraVals[name] = val.asString();
598  return true;
599  });
600 
601  const auto &authOpt = reqRef->owner()->authenticationRequired( *this, reqRef, u, provMsg->value( AuthDataRequestMsgFields::LastAuthTimestamp ).asInt64(), extraVals );
602  if ( !authOpt ) {
603  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "No auth given by user." ) )
604  return;
605  continue;
606  }
607 
608  auto r = ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
609  if ( !_messageStream->sendMessage( r.impl() ) ) {
610  ERR << "Failed to send AuthorizationInfo to worker process." << std::endl;
611  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
612  return;
613  }
614  continue;
615 
616  } catch ( const zypp::Exception &e ) {
617  ZYPP_CAUGHT(e);
618  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.asString() ) )
619  return;
620  continue;
621  }
622 
623  } else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
624 
625  if ( !reqIter->isAttachRequest() ) {
626  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
628  return;
629  }
630 
631  // if the file was cancelled we send a failure back
632  if( reqIter->_state == Item::Cancelling ) {
633  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Item was cancelled" ) )
634  return;
635  continue;
636  }
637 
638  MIL << "Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
639 
640  //const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc
641  std::vector<std::string> freeDevs;
642  for ( const auto &val : provMsg->values( MediaChangeRequestMsgFields::Device) ) {
643  freeDevs.push_back( val.asString() );
644  }
645 
646  std::optional<std::string> desc;
647  const auto &descVal = provMsg->value( MediaChangeRequestMsgFields::Desc );
648  if ( descVal.valid () && descVal.isString() )
649  desc = descVal.asString();
650 
651  auto res = _parent._sigMediaChange.emit(
652  _parent.queueName(*this),
653  provMsg->value( MediaChangeRequestMsgFields::Label ).asString(),
654  provMsg->value( MediaChangeRequestMsgFields::MediaNr ).asInt(),
655  freeDevs,
656  desc
657  );
658 
659  auto action = res ? *res : Provide::Action::ABORT;
660  switch ( action ) {
661  case Provide::Action::RETRY: {
662  MIL << "Sending back a MediaChanged message, retrying to find medium " << std::endl;
663  auto r = ProvideMessage::createMediaChanged ( reqIter->_request->provideMessage().requestId() );
664  if ( !_messageStream->sendMessage( r.impl() ) ){
665  ERR << "Failed to send MediaChanged to worker process." << std::endl;
666  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
667  return;
668  }
669  continue;
670  }
671  case Provide::Action::ABORT: {
672  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
673  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Cancelled by User" ) )
674  return;
675  continue;
676  }
677  case Provide::Action::SKIP: {
678  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
679  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip, "Skipped by User" ) )
680  return;
681  continue;
682  }
683  }
684  } else {
685  // if there is a unsupported worker request we need to stop immediately because the worker will be blocked until it gets a answer
686  ERR << "Unsupported worker request: "<<code<<", this is a fatal error!" << std::endl;
688  return;
689  }
690 
691  } else {
692  // unknown code
693  ERR << "Received unsupported message " << msg->messagetypename() << " with code " << code << " ignoring! " << std::endl;
694  }
695 
696  } else {
697  ERR << "Received unsupported message " << msg->messagetypename() << "ignoring" << std::endl;
698  }
699  }
700  }
701 
707  {
708  // read all stderr data so we get the full logs
709  auto ba = _workerProc->channelReadLine(Process::StdErr);
710  while ( !ba.empty() ) {
711  forwardToLog(std::string( ba.data(), ba.size() ) );
712  ba = _workerProc->channelReadLine(Process::StdErr);
713  }
714  }
715 
716  void ProvideQueue::forwardToLog( std::string &&logLine )
717  {
718  if ( (_capabilities.cfg_flags () & zypp::proto::Capabilities::ZyppLogFormat) == zypp::proto::Capabilities::ZyppLogFormat )
719  zypp::base::LogControl::instance ().logRawLine( std::move(logLine) );
720  else
721  MIL << "Message from worker: " << _capabilities.worker_name() << ":" << logLine << std::endl;
722  }
723 
724  void ProvideQueue::processReadyRead(int channel) {
725  // ignore stdout here
726  if ( channel == Process::StdOut )
727  return;
728 
729  // forward the stderr output to the log bypassing the formatter
730  // the worker already formatted the line
731  while ( _workerProc->canReadLine(Process::StdErr) ) {
732  const auto &data = _workerProc->channelReadLine( Process::StdErr );
733  if ( data.empty() )
734  return;
735 
736  forwardToLog(std::string( data.data(), data.size() ) );
737  }
738  }
739 
740  void ProvideQueue::procFinished(int exitCode)
741  {
742  // process all pending messages
743  processMessage();
744 
745  // get all of the log lines
746  readAllStderr();
747 
748  // shut down
749  // @todo implement worker restart in case of a unexpected exit
750  if ( !_queueShuttingDown )
751  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
752 
753 #if 0
754  if ( !_queueShuttingDown ) {
755 
756  _crashCounter++;
757  if ( _crashCounter > 3 ) {
758  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
759  return;
760  }
761 
762  MIL << "Unexpected queue worker exit with code: " << exitCode << std::endl;
763  // try to spawn the worker again, move active items back to wait list and start over
764 
765  if ( !doStartup () ) {
766 
767  }
768  }
769 #endif
770  }
771 
773  return _parent.nextRequestId();
774  }
775 }
std::list< Item > _activeItems
int assert_dir(const Pathname &path, unsigned mode)
Like &#39;mkdir -p&#39;.
Definition: PathInfo.cc:319
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
#define MIL
Definition: Logger.h:96
const std::string & asString() const
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:727
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
Definition: provide_p.h:104
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
Store and operate with byte count.
Definition: ByteCount.h:30
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
const zypp::Pathname & workerPath() const
Definition: provide.cc:830
RpcMessageStreamPtr _messageStream
const char * c_str() const
String representation.
Definition: Pathname.h:110
static ProvideMessage createMediaChanged(const uint32_t reqId)
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:92
bool provideDebugEnabled()
Definition: providedbg_p.h:28
uint32_t nextRequestId()
Definition: provide.cc:917
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
bool canScheduleMore() const
std::optional< TimePoint > idleSince() const
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
static constexpr uint32_t InvalidId
ProvideRequestRef _request
constexpr std::string_view Label("label")
bool isString() const
#define ERR
Definition: Logger.h:98
Signal< void()> _sigIdle
void cancel(ProvideRequest *item, std::exception_ptr error)
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:747
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
uint32_t nextRequestId()
zypp::ByteCount expectedProvideSize() const
void immediateShutdown(const std::exception_ptr &reason)
constexpr std::string_view LocalFilename("local_filename")
uint activeRequests() const
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:700
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
const std::string & asString() const
String representation.
Definition: Pathname.h:91
const Config & workerConfig() const
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
void procFinished(int exitCode)
Just inherits Exception to separate media exceptions.
constexpr std::string_view Device("device")
Provides API related macros.
#define WAR
Definition: Logger.h:97
constexpr std::string_view MediaNr("media_nr")
bool isRunning() const
Definition: provide.cc:844
void forwardToLog(std::string &&logLine)
std::optional< TimePoint > _idleSince
ProvideQueue(ProvidePrivate &parent)
Definition: providequeue.cc:44
ProvidePrivate & _parent
void schedule(ScheduleReason reason)
Definition: provide.cc:38
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:835
constexpr std::string_view EffectiveUrl("effective_url")
Process::Ptr _workerProc
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:57
zypp::proto::Capabilities Config
SignalProxy< void()> sigIdle()
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition: Exception.h:436
#define LIBZYPP_VERSION_STRING
Definition: APIConfig.h:15
zypp::Pathname _currentExe
Base class for Exception.
Definition: Exception.h:145
static ProvideMessage createCancel(const uint32_t reqId)
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
Definition: LogControl.cc:912
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
Wrapper class for ::stat/::lstat.
Definition: PathInfo.h:220
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
zypp::proto::Configuration Configuration
Definition: provideworker.h:33
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
uint requestCount() const
std::deque< Item > _waitQueue
constexpr std::string_view Desc("desc")
constexpr std::string_view CacheHit("cacheHit")
void processReadyRead(int channel)
Url manipulation class.
Definition: Url.h:91
#define DBG
Definition: Logger.h:95
constexpr std::string_view ExpectedFilesize("expected_filesize")
const std::string & hostname() const