libzypp  17.31.31
provideworker.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "provideworker.h"
11 #include <zypp-core/base/DtorReset>
12 #include <zypp-core/AutoDispose.h>
13 #include <zypp-core/Url.h>
14 #include <zypp-core/Date.h>
15 #include <zypp-core/zyppng/pipelines/AsyncResult>
16 #include <zypp-core/base/LogControl.h>
17 #include <zypp-core/fs/PathInfo.h>
18 #include <zypp-core/fs/TmpPath.h>
19 #include <zypp-core/zyppng/base/private/threaddata_p.h>
20 #include <zypp-core/zyppng/base/AutoDisconnect>
21 #include <zypp-core/zyppng/base/EventDispatcher>
22 #include <zypp-media/MediaConfig>
23 #include <ostream>
24 #include <fstream>
25 
27 
28 #undef ZYPP_BASE_LOGGER_LOGGROUP
29 #define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker"
30 
31 namespace zyppng::worker {
32 
33  using namespace zyppng::operators;
34 
35  RequestCancelException::RequestCancelException() : zypp::media::MediaException ("Request was cancelled")
36  { }
37 
38  ProvideWorker::ProvideWorker(std::string_view workerName) : _workerName(workerName)
39  {
40  // do not change the order of these calls, otherwise showing the threadname does not work
41  // enableLogForwardingMode will initialize the log which would override the current thread name
43  ThreadData::current().setName( workerName );
44 
45  // we use a singleshot timer that triggers message handling
46  connect( *_msgAvail, &Timer::sigExpired, *this, &ProvideWorker::messageLoop );
47  _msgAvail->setSingleShot(true);
48 
49  // another timer to trigger a delayed shutdown
50  connectFunc( *_delayedShutdown, &Timer::sigExpired, [this]( zyppng::Timer & ) {
52  }, *this );
53  _delayedShutdown->setSingleShot(true);
54  }
55 
57  { }
58 
59  RpcMessageStream::Ptr ProvideWorker::messageStream() const
60  {
61  return _stream;
62  }
63 
64  expected<void> ProvideWorker::run( int recv, int send )
65  {
66  // reentry not supported
67  assert ( !_isRunning );
68 
70  _isRunning = true;
71 
72  initLog();
73 
74  zypp::OnScopeExit cleanup([&](){
75  _stream.reset();
76  _controlIO.reset();
77  _loop.reset();
78  });
79 
80  _controlIO = AsyncDataSource::create();
81  if ( !_controlIO->openFds( { recv }, send ) ) {
82  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to open control FDs")) );
83  }
84 
85  connect( *_controlIO, &AsyncDataSource::sigReadFdClosed, *this, &ProvideWorker::readFdClosed );
86  connect( *_controlIO, &AsyncDataSource::sigWriteFdClosed, *this, &ProvideWorker::writeFdClosed );
87 
88  _stream = RpcMessageStream::create( _controlIO );
89 
90  return executeHandshake () | mbind( [&]() {
91  AutoDisconnect disC[] = {
92  connect( *_stream, &RpcMessageStream::sigMessageReceived, *this, &ProvideWorker::messageReceived ),
93  connect( *_stream, &RpcMessageStream::sigInvalidMessageReceived, *this, &ProvideWorker::onInvalidMessageReceived )
94  };
95  _loop->run();
96  if ( _fatalError )
97  return expected<void>::error( _fatalError );
98  return expected<void>::success();
99  });
100  }
101 
102  std::deque<ProvideWorkerItemRef> &ProvideWorker::requestQueue()
103  {
104  return _pendingProvides;
105  }
106 
108  return _provNotificationMode;
109  }
110 
113  }
114 
116  {
117  // by default we log to strErr, if user code wants to change that it can overload this function
119  }
120 
121  ProvideWorkerItemRef ProvideWorker::makeItem( ProvideMessage &&spec )
122  {
123  return std::make_shared<ProvideWorkerItem>( std::move(spec) );
124  }
125 
126  void ProvideWorker::provideStart(const uint32_t id, const zypp::Url &url, const zypp::filesystem::Pathname &localFile, const zypp::Pathname &stagingFile )
127  {
128  if ( !_stream->sendMessage( ProvideMessage::createProvideStarted ( id
129  , url
130  , localFile.empty () ? std::optional<std::string>() : localFile.asString ()
131  , stagingFile.empty () ? std::optional<std::string>() : stagingFile.asString ()
132  ).impl() ) ) {
133  ERR << "Failed to send ProvideStart message" << std::endl;
134  }
135  }
136 
137  void ProvideWorker::provideSuccess(const uint32_t id, bool cacheHit, const zypp::filesystem::Pathname &localFile, const HeaderValueMap extra )
138  {
139  MIL_PRV << "Sending provideSuccess for id " << id << " file " << localFile << std::endl;
140  auto msg = ProvideMessage::createProvideFinished( id ,localFile.asString() ,cacheHit);
141  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
142  for ( const auto &val : i->second )
143  msg.addValue( i->first, val );
144  }
145  if ( !_stream->sendMessage( msg.impl() ) ) {
146  ERR << "Failed to send ProvideSuccess message" << std::endl;
147  }
148  }
149 
150  void ProvideWorker::provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra )
151  {
152  MIL_PRV << "Sending provideFailed for request " << id << " err: " << reason << std::endl;
153  auto msg = ProvideMessage::createErrorResponse ( id, code, reason, transient );
154  for ( auto i = extra.beginList (); i != extra.endList(); i++ ) {
155  for ( const auto &val : i->second )
156  msg.addValue( i->first, val );
157  }
158  if ( !_stream->sendMessage( msg.impl() ) ) {
159  ERR << "Failed to send ProvideFailed message" << std::endl;
160  }
161  }
162 
163 
164  void ProvideWorker::provideFailed ( const uint32_t id, const uint code, const bool transient, const zypp::Exception &e )
165  {
167  if ( !e.historyEmpty() ) {
169  }
170  provideFailed( id
171  , code
172  , e.asUserString()
173  , transient
174  , extra );
175  }
176 
177 
178  void ProvideWorker::attachSuccess(const uint32_t id)
179  {
180  MIL_PRV << "Sending attachSuccess for request " << id << std::endl;
181  if ( !_stream->sendMessage( ProvideMessage::createAttachFinished ( id ).impl() ) ) {
182  ERR << "Failed to send AttachFinished message" << std::endl;
183  } else {
184  MIL << "Sent back attach success" << std::endl;
185  }
186  }
187 
188  void ProvideWorker::detachSuccess(const uint32_t id)
189  {
190  MIL_PRV << "Sending detachSuccess for request " << id << std::endl;
191  if ( !_stream->sendMessage( ProvideMessage::createDetachFinished ( id ).impl() ) ) {
192  ERR << "Failed to send DetachFinished message" << std::endl;
193  }
194  }
195 
196  expected<ProvideMessage> ProvideWorker::sendAndWaitForResponse( const ProvideMessage &request , const std::vector<uint> &responseCodes )
197  {
198  // make sure immediateShutdown is not called while we are blocking here
199  zypp::DtorReset delayedReset( _inControllerRequest );
200  _inControllerRequest = true;
201 
202  if ( !_stream->sendMessage( request.impl() ) )
203  return expected<ProvideMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send message")) );
204 
205  // flush the io device, this will block until all bytes are written
206  _controlIO->flush();
207 
208  while ( !_fatalError ) {
209 
210  const auto &msg = _stream->nextMessageWait() | [&]( auto &&nextMessage ) {
211  if ( !nextMessage ) {
212  if ( _fatalError )
213  return expected<RpcMessage>::error( _fatalError );
214  else
215  return expected<RpcMessage>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to wait for response")) );
216  }
217  return expected<RpcMessage>::success( std::move(*nextMessage) );
218  } | mbind ( [&]( auto && m) {
219  return parseReceivedMessage(m);
220  } );
221 
222  if ( !msg ) {
223  ERR << "Failed to receive message" << std::endl;
224  return msg;
225  }
226 
227  if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
228  return msg;
229  }
230 
231  // remember other messages for later
232  MIL << "Remembering message for later: " << msg->code () << std::endl;
233  _pendingMessages.push_back(*msg);
234  _msgAvail->start(0);
235  }
236  return expected<ProvideMessage>::error( _fatalError );
237  }
238 
239  ProvideWorker::MediaChangeRes ProvideWorker::requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc )
240  {
241  return sendAndWaitForResponse( ProvideMessage::createMediaChangeRequest ( id, label, mediaNr, devices, desc ), { ProvideMessage::Code::MediaChanged, ProvideMessage::Code::MediaChangeAbort, ProvideMessage::Code::MediaChangeSkip } )
242  | [&]( expected<ProvideMessage> &&m ) {
243  if ( !m ) {
244  MIL << "Failed to wait for message, aborting the request " << std::endl;
245  return ProvideWorker::MediaChangeRes::ABORT;
246  }
247  MIL << "Wait finished, with messages still pending: " << this->_pendingMessages.size() << " and provs still pending: " << this->_pendingProvides.size() << std::endl;
248  if ( m->code() == ProvideMessage::Code::MediaChanged )
249  return ProvideWorker::MediaChangeRes::SUCCESS;
250  else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
251  return ProvideWorker::MediaChangeRes::SKIP;
252  else
253  return ProvideWorker::MediaChangeRes::ABORT;
254  };
255  }
256 
257  expected<AuthInfo> ProvideWorker::requireAuthorization( const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername, const int64_t lastTimestamp, const std::map<std::string, std::string> &extraFields )
258  {
259  return sendAndWaitForResponse( ProvideMessage::createAuthDataRequest( id, url, lastTriedUsername, lastTimestamp, extraFields ), { ProvideMessage::Code::AuthInfo, ProvideMessage::Code::NoAuthData } )
260  | mbind( [&]( ProvideMessage &&m ) {
261  if ( m.code() == ProvideMessage::Code::AuthInfo ) {
262 
263  AuthInfo inf;
264  m.forEachVal( [&]( const std::string &name, const ProvideMessage::FieldVal &val ) {
265  if ( name == AuthInfoMsgFields::Username ) {
266  inf.username = val.asString();
267  } else if ( name == AuthInfoMsgFields::Password ) {
268  inf.password = val.asString();
269  } else if ( name == AuthInfoMsgFields::AuthTimestamp ) {
270  inf.last_auth_timestamp = val.asInt64();
271  } else {
272  if ( !val.isString() ) {
273  ERR << "Ignoring invalid extra value, " << name << " is not of type string" << std::endl;
274  }
275  inf.extraKeys[name] = val.asString();
276  }
277  return true;
278  });
279  return expected<AuthInfo>::success(inf);
280 
281  }
282  return expected<AuthInfo>::error( ZYPP_EXCPT_PTR( zypp::media::MediaException("No Auth data")) );
283  });
284  }
285 
286  AsyncDataSource &ProvideWorker::controlIO()
287  {
288  return *_controlIO.get();
289  }
290 
292  {
293  const auto &helo = _stream->nextMessageWait();
294  if ( !helo ) {
295  ERR << "Could not receive a handshake message, aborting" << std::endl;
296  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to receive handshake message")) );;
297  }
298 
299  auto exp = _stream->parseMessage<zypp::proto::Configuration>( *helo );
300  if ( !exp ) {
301  invalidMessageReceived( exp.error() );
302  return expected<void>::error(exp.error());
303  }
304 
305  return std::move(*exp) | [&]( auto &&conf ) {
306 
307  _workerConf = std::move(conf);
308 
309  auto &mediaConf = zypp::MediaConfig::instance();
310  for( const auto &[key,value] : _workerConf.values() ) {
311  zypp::Url keyUrl( key );
312  if ( keyUrl.getScheme() == "zconfig" && keyUrl.getAuthority() == "main" ) {
313  mediaConf.setConfigValue( keyUrl.getAuthority(), zypp::Pathname(keyUrl.getPathName()).basename(), value );
314  }
315  }
316 
317  return initialize( _workerConf ) | mbind([&]( WorkerCaps &&caps ){
318 
319  caps.set_worker_name( _workerName.data() );
320 
321  caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
322  if ( !_stream->sendMessage ( caps ) ) {
323  return expected<void>::error( ZYPP_EXCPT_PTR(zypp::Exception("Failed to send capabilities")) );
324  }
325  return expected<void>::success ();
326  });
327  };
328  }
329 
331  {
332  if ( _fatalError )
333  return;
334 
335  while ( _pendingMessages.size () ) {
336  auto m = _pendingMessages.front ();
337  _pendingMessages.pop_front ();
339  }
340 
341  if ( !_fatalError && _pendingProvides.size() ) {
342  provide();
343  }
344 
345  // keep poking until there are no provides anymore
346  if ( !_fatalError && ( _pendingMessages.size() || ( _pendingProvides.size () && _provNotificationMode == QUEUE_NOT_EMTPY ) ) ) {
347  _msgAvail->start(0);
348  }
349 
350  }
351 
353  {
354  if ( _inControllerRequest ) {
355  _delayedShutdown->start(0);
356  return;
357  }
358 
360  _loop->quit ();
361  }
362 
363  void ProvideWorker::readFdClosed( uint, AsyncDataSource::ChannelCloseReason )
364  {
365  MIL << "Read FD closed, exiting." << std::endl;
367  }
368 
369  void ProvideWorker::writeFdClosed( AsyncDataSource::ChannelCloseReason )
370  {
371  MIL << "Write FD closed, exiting." << std::endl;
373  }
374 
376  {
377  while ( auto message = _stream->nextMessage() ) {
378  if ( _fatalError )
379  break;
380  pushSingleMessage(*message);
381  }
382  }
383 
385  {
386  invalidMessageReceived( std::exception_ptr() );
387  }
388 
389  void ProvideWorker::invalidMessageReceived( std::exception_ptr p )
390  {
391  ERR << "Received a invalid message on the input stream, aborting" << std::endl;
392  if ( p )
393  _fatalError = p;
394  else
395  _fatalError = ZYPP_EXCPT_PTR( InvalidMessageReceivedException() );
397  _loop->quit();
398  }
399 
401  {
402  const auto code = provide.code();
403  // we only accept requests here
404  if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
405 
406  MIL_PRV << "Received request: " << code << std::endl;
407 
408  if ( code == ProvideMessage::Code::Cancel ) {
409  const auto &i = std::find_if( _pendingProvides.begin (), _pendingProvides.end(), [ id = provide.requestId() ]( const auto &it ){ return it->_spec.requestId() == id; } );
410  if ( i != _pendingProvides.end() ) {
411  switch ( (*i)->_state ) {
413  _stream->sendMessage ( ProvideMessage::createErrorResponse ( provide.requestId (), ProvideMessage::Code::Cancelled, "Cancelled by user." ).impl() );
414  _pendingProvides.erase(i);
415  break;
417  cancel(i);
418  break;
420  break;
421  }
422  MIL << "Received Cancel for unknown request: " << provide.requestId() << ", ignoring!" << std::endl;
423  }
424  return;
425  }
426 
428  return;
429  }
430  ERR << "Unsupported request with code: " << code << " received!" << std::endl;
431  }
432 
433  void ProvideWorker::pushSingleMessage( const RpcMessage &message )
434  {
435  const auto &handle = [&]( const RpcMessage &message ){
436  const auto &msgTypeName = message.messagetypename();
437  if ( msgTypeName == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
438  return parseReceivedMessage( message )
439  | mbind( [&]( ProvideMessage &&provide ){
440  _pendingMessages.push_back(provide);
441  _msgAvail->start(0);
442  return expected<void>::success();
443  });
444  }
445  return expected<void>::error( ZYPP_EXCPT_PTR( std::invalid_argument(zypp::str::Str()<<"Unknown message received: " << message.messagetypename())) );
446  };
447 
448  const auto &exp = handle( message );
449  if ( !exp ) {
450  try {
451  std::rethrow_exception ( exp.error () );
452  } catch ( const zypp::Exception &e ) {
453  ERR << "Catched exception during message handling: " << e << std::endl;
454  } catch ( const std::exception &e ) {
455  ERR << "Catched exception during message handling: " << e.what()<< std::endl;
456  } catch ( ... ) {
457  ERR << "Unknown Exception during message handling" << std::endl;
458  }
459  }
460  }
461 
462  expected<ProvideMessage> ProvideWorker::parseReceivedMessage(const RpcMessage &m)
463  {
464  auto exp = ProvideMessage::create(m);
465  if ( !exp )
466  invalidMessageReceived( exp.error() );
467  return exp;
468  }
469 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:533
void enableLogForwardingMode(bool enable=true)
Definition: LogControl.cc:881
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
#define MIL
Definition: Logger.h:96
const std::string & asString() const
ProvideNotificatioMode provNotificationMode() const
ProvideNotificatioMode _provNotificationMode
void pushSingleMessage(const RpcMessage &msg)
std::exception_ptr _fatalError
virtual void cancel(const std::deque< ProvideWorkerItemRef >::iterator &request)=0
zypp::proto::Capabilities WorkerCaps
Definition: provideworker.h:31
expected< ProvideMessage > parseReceivedMessage(const RpcMessage &m)
ValueMap::iterator endList()
static ProvideMessage createAttachFinished(const uint32_t reqId)
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
RpcMessageStream::Ptr _stream
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
void attachSuccess(const uint32_t id)
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:432
void addValue(const std::string &name, const FieldVal &value)
void provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
bool isString() const
#define ERR
Definition: Logger.h:98
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
constexpr std::string_view Password("password")
void invalidMessageReceived(std::exception_ptr p)
constexpr std::string_view Username("username")
int64_t asInt64() const
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
void logToStdErr()
Log to std::err.
Definition: LogControl.cc:894
bool empty() const
Test for an empty path.
Definition: Pathname.h:114
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
virtual void immediateShutdown()
Definition: provideworker.h:87
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
Definition: String.h:211
ProvideWorker(std::string_view workerName)
const std::string & asString() const
String representation.
Definition: Pathname.h:91
Just inherits Exception to separate media exceptions.
std::string asUserString() const
Translated error message as string suitable for the user.
Definition: Exception.cc:82
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
bool historyEmpty() const
Whether the history list is empty.
Definition: Exception.h:262
AsyncDataSource & controlIO()
std::string historyAsString() const
The history as string.
Definition: Exception.cc:146
expected< void > executeHandshake()
RpcMessageStream::Ptr messageStream() const
std::string getAuthority() const
Returns the encoded authority component of the URL.
Definition: Url.cc:541
ValueMap::iterator beginList()
#define MIL_PRV
Definition: providedbg_p.h:35
std::deque< ProvideWorkerItemRef > & requestQueue()
constexpr std::string_view AuthTimestamp("auth_timestamp")
std::map< std::string, std::string > extraKeys
Definition: provideworker.h:40
Base class for Exception.
Definition: Exception.h:145
void handleSingleMessage(const ProvideMessage &provide)
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
Definition: Url.cc:604
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
AsyncDataSource::Ptr _controlIO
void detachSuccess(const uint32_t id)
constexpr std::string_view History("history")
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
zypp::proto::Configuration Configuration
Definition: provideworker.h:33
std::deque< ProvideWorkerItemRef > _pendingProvides
Easy-to use interface to the ZYPP dependency resolver.
Definition: CodePitfalls.doc:1
zypp::proto::ProvideMessage & impl()
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
std::deque< ProvideMessage > _pendingMessages
bool provide(const Pathname &delta_r, const Pathname &new_r, const Progress &report_r)
Apply a binary delta to on-disk data to re-create a new rpm.
Url manipulation class.
Definition: Url.h:91
virtual expected< WorkerCaps > initialize(const Configuration &conf)=0
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)
static MediaConfig & instance()
Definition: mediaconfig.cc:46