11 #include <zypp-core/base/DtorReset> 12 #include <zypp-core/AutoDispose.h> 13 #include <zypp-core/Url.h> 14 #include <zypp-core/Date.h> 15 #include <zypp-core/zyppng/pipelines/AsyncResult> 16 #include <zypp-core/base/LogControl.h> 17 #include <zypp-core/fs/PathInfo.h> 18 #include <zypp-core/fs/TmpPath.h> 19 #include <zypp-core/zyppng/base/private/threaddata_p.h> 20 #include <zypp-core/zyppng/base/AutoDisconnect> 21 #include <zypp-core/zyppng/base/EventDispatcher> 22 #include <zypp-media/MediaConfig> 28 #undef ZYPP_BASE_LOGGER_LOGGROUP 29 #define ZYPP_BASE_LOGGER_LOGGROUP "ProvideWorker" 43 ThreadData::current().setName( workerName );
50 connectFunc( *
_delayedShutdown, &Timer::sigExpired, [
this]( zyppng::Timer & ) {
81 if ( !
_controlIO->openFds( { recv }, send ) ) {
91 AutoDisconnect disC[] = {
98 return expected<void>::success();
123 return std::make_shared<ProvideWorkerItem>( std::move(spec) );
130 , localFile.
empty () ? std::optional<std::string>() : localFile.
asString ()
131 , stagingFile.
empty () ? std::optional<std::string>() : stagingFile.
asString ()
133 ERR <<
"Failed to send ProvideStart message" << std::endl;
139 MIL_PRV <<
"Sending provideSuccess for id " <<
id <<
" file " << localFile << std::endl;
142 for (
const auto &val : i->second )
145 if ( !
_stream->sendMessage( msg.impl() ) ) {
146 ERR <<
"Failed to send ProvideSuccess message" << std::endl;
152 MIL_PRV <<
"Sending provideFailed for request " <<
id <<
" err: " << reason << std::endl;
155 for (
const auto &val : i->second )
156 msg.addValue( i->first, val );
158 if ( !
_stream->sendMessage( msg.impl() ) ) {
159 ERR <<
"Failed to send ProvideFailed message" << std::endl;
180 MIL_PRV <<
"Sending attachSuccess for request " <<
id << std::endl;
182 ERR <<
"Failed to send AttachFinished message" << std::endl;
184 MIL <<
"Sent back attach success" << std::endl;
190 MIL_PRV <<
"Sending detachSuccess for request " <<
id << std::endl;
192 ERR <<
"Failed to send DetachFinished message" << std::endl;
210 const auto &msg =
_stream->nextMessageWait() | [&](
auto &&nextMessage ) {
211 if ( !nextMessage ) {
217 return expected<RpcMessage>::success( std::move(*nextMessage) );
218 } | mbind ( [&](
auto && m) {
223 ERR <<
"Failed to receive message" << std::endl;
227 if ( std::find( responseCodes.begin (), responseCodes.end(), msg->code() ) != responseCodes.end() ) {
232 MIL <<
"Remembering message for later: " << msg->code () << std::endl;
236 return expected<ProvideMessage>::error(
_fatalError );
242 | [&]( expected<ProvideMessage> &&m ) {
244 MIL <<
"Failed to wait for message, aborting the request " << std::endl;
245 return ProvideWorker::MediaChangeRes::ABORT;
248 if ( m->code() == ProvideMessage::Code::MediaChanged )
249 return ProvideWorker::MediaChangeRes::SUCCESS;
250 else if ( m->code() == ProvideMessage::Code::MediaChangeSkip )
251 return ProvideWorker::MediaChangeRes::SKIP;
253 return ProvideWorker::MediaChangeRes::ABORT;
261 if ( m.code() == ProvideMessage::Code::AuthInfo ) {
273 ERR <<
"Ignoring invalid extra value, " << name <<
" is not of type string" << std::endl;
279 return expected<AuthInfo>::success(inf);
293 const auto &helo =
_stream->nextMessageWait();
295 ERR <<
"Could not receive a handshake message, aborting" << std::endl;
302 return expected<void>::error(exp.error());
305 return std::move(*exp) | [&](
auto &&conf ) {
310 for(
const auto &[key,value] :
_workerConf.values() ) {
321 caps.set_cfg_flags ( WorkerCaps::Flags(caps.cfg_flags() | WorkerCaps::ZyppLogFormat) );
322 if ( !
_stream->sendMessage ( caps ) ) {
325 return expected<void>::success ();
365 MIL <<
"Read FD closed, exiting." << std::endl;
371 MIL <<
"Write FD closed, exiting." << std::endl;
377 while (
auto message =
_stream->nextMessage() ) {
391 ERR <<
"Received a invalid message on the input stream, aborting" << std::endl;
402 const auto code =
provide.code();
404 if ( code >= ProvideMessage::Code::FirstControllerCode && code <= ProvideMessage::Code::LastControllerCode ) {
406 MIL_PRV <<
"Received request: " << code << std::endl;
408 if ( code == ProvideMessage::Code::Cancel ) {
411 switch ( (*i)->_state ) {
422 MIL <<
"Received Cancel for unknown request: " <<
provide.requestId() <<
", ignoring!" << std::endl;
430 ERR <<
"Unsupported request with code: " << code <<
" received!" << std::endl;
435 const auto &handle = [&](
const RpcMessage &message ){
436 const auto &msgTypeName = message.messagetypename();
437 if ( msgTypeName == rpc::messageTypeName<zypp::proto::ProvideMessage>() ) {
442 return expected<void>::success();
445 return expected<void>::error(
ZYPP_EXCPT_PTR( std::invalid_argument(
zypp::str::Str()<<
"Unknown message received: " << message.messagetypename())) );
448 const auto &exp = handle( message );
451 std::rethrow_exception ( exp.error () );
453 ERR <<
"Catched exception during message handling: " << e << std::endl;
454 }
catch (
const std::exception &e ) {
455 ERR <<
"Catched exception during message handling: " << e.what()<< std::endl;
457 ERR <<
"Unknown Exception during message handling" << std::endl;
std::string getScheme() const
Returns the scheme name of the URL.
void enableLogForwardingMode(bool enable=true)
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
ProvideNotificatioMode provNotificationMode() const
ProvideNotificatioMode _provNotificationMode
void pushSingleMessage(const RpcMessage &msg)
std::exception_ptr _fatalError
virtual void cancel(const std::deque< ProvideWorkerItemRef >::iterator &request)=0
zypp::proto::Capabilities WorkerCaps
Timer::Ptr _delayedShutdown
expected< ProvideMessage > parseReceivedMessage(const RpcMessage &m)
void maybeDelayedShutdown()
Configuration _workerConf
static ProvideMessage createAttachFinished(const uint32_t reqId)
expected< void > run(int recv=STDIN_FILENO, int send=STDOUT_FILENO)
RpcMessageStream::Ptr _stream
void readFdClosed(uint, AsyncDataSource::ChannelCloseReason)
std::string_view _workerName
void attachSuccess(const uint32_t id)
void provideStart(const uint32_t id, const zypp::Url &url, const zypp::Pathname &localFile, const zypp::Pathname &stagingFile={})
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
void addValue(const std::string &name, const FieldVal &value)
bool _inControllerRequest
void provideFailed(const uint32_t id, const uint code, const std::string &reason, const bool transient, const HeaderValueMap extra={})
MediaChangeRes requestMediaChange(const uint32_t id, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc={})
static ProvideMessage createProvideStarted(const uint32_t reqId, const zypp::Url &url, const std::optional< std::string > &localFilename={}, const std::optional< std::string > &stagingFilename={})
constexpr std::string_view Password("password")
void invalidMessageReceived(std::exception_ptr p)
constexpr std::string_view Username("username")
static LogControl instance()
Singleton access.
expected< ProvideMessage > sendAndWaitForResponse(const ProvideMessage &request, const std::vector< uint > &responseCodes)
Assign a vaiable a certain value when going out of scope.
void logToStdErr()
Log to std::err.
bool empty() const
Test for an empty path.
void setProvNotificationMode(const ProvideNotificatioMode &provNotificationMode)
virtual void immediateShutdown()
Convenient building of std::string via std::ostringstream Basically a std::ostringstream autoconverti...
ProvideWorker(std::string_view workerName)
const std::string & asString() const
String representation.
std::string asUserString() const
Translated error message as string suitable for the user.
void messageLoop(Timer &)
static ProvideMessage createAuthDataRequest(const uint32_t reqId, const zypp::Url &effectiveUrl, const std::string &lastTriedUser="", const std::optional< int64_t > &lastAuthTimestamp={}, const std::map< std::string, std::string > &extraValues={})
virtual ProvideWorkerItemRef makeItem(ProvideMessage &&spec)
bool historyEmpty() const
Whether the history list is empty.
AsyncDataSource & controlIO()
std::string historyAsString() const
The history as string.
expected< void > executeHandshake()
RpcMessageStream::Ptr messageStream() const
std::string getAuthority() const
Returns the encoded authority component of the URL.
int64_t last_auth_timestamp
std::deque< ProvideWorkerItemRef > & requestQueue()
constexpr std::string_view AuthTimestamp("auth_timestamp")
std::map< std::string, std::string > extraKeys
Base class for Exception.
void handleSingleMessage(const ProvideMessage &provide)
std::string getPathName(EEncoding eflag=zypp::url::E_DECODED) const
Returns the path name from the URL.
static ProvideMessage createProvideFinished(const uint32_t reqId, const std::string &localFilename, bool cacheHit)
AsyncDataSource::Ptr _controlIO
void detachSuccess(const uint32_t id)
constexpr std::string_view History("history")
void onInvalidMessageReceived()
expected< AuthInfo > requireAuthorization(const uint32_t id, const zypp::Url &url, const std::string &lastTriedUsername="", const int64_t lastTimestamp=-1, const std::map< std::string, std::string > &extraFields={})
void writeFdClosed(AsyncDataSource::ChannelCloseReason)
zypp::proto::Configuration Configuration
std::deque< ProvideWorkerItemRef > _pendingProvides
Easy-to use interface to the ZYPP dependency resolver.
zypp::proto::ProvideMessage & impl()
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
void provideSuccess(const uint32_t id, bool cacheHit, const zypp::Pathname &localFile, const HeaderValueMap extra={})
static ProvideMessage createDetachFinished(const uint32_t reqId)
std::deque< ProvideMessage > _pendingMessages
bool provide(const Pathname &delta_r, const Pathname &new_r, const Progress &report_r)
Apply a binary delta to on-disk data to re-create a new rpm.
virtual expected< WorkerCaps > initialize(const Configuration &conf)=0
static ProvideMessage createMediaChangeRequest(const uint32_t reqId, const std::string &label, int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)