XRootD
Loading...
Searching...
No Matches
XrdTpcState.cc
Go to the documentation of this file.
1
2#include <algorithm>
3#include <sstream>
4#include <stdexcept>
5
6#include "XrdVersion.hh"
9
10#include <curl/curl.h>
11
12#include "XrdTpcState.hh"
13#include "XrdTpcStream.hh"
14
15using namespace TPC;
16
17
19 if (m_headers) {
20 curl_slist_free_all(m_headers);
21 m_headers = NULL;
22 if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
23 }
24}
25
26
27void State::Move(State &other)
28{
29 m_push = other.m_push;
30 m_recv_status_line = other.m_recv_status_line;
31 m_recv_all_headers = other.m_recv_all_headers;
32 m_offset = other.m_offset;
33 m_start_offset = other.m_start_offset;
34 m_status_code = other.m_status_code;
35 m_content_length = other.m_content_length;
36 m_push_length = other.m_push_length;
37 m_stream = other.m_stream;
38 m_curl = other.m_curl;
39 m_headers = other.m_headers;
40 m_headers_copy = other.m_headers_copy;
41 m_resp_protocol = other.m_resp_protocol;
42 m_is_transfer_state = other.m_is_transfer_state;
43 curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
44 if (m_is_transfer_state) {
45 if (m_push) {
46 curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
47 } else {
48 curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
49 }
50 }
51 tpcForwardCreds = other.tpcForwardCreds;
52 other.m_headers_copy.clear();
53 other.m_curl = NULL;
54 other.m_headers = NULL;
55 other.m_stream = NULL;
56}
57
58
59bool State::InstallHandlers(CURL *curl) {
60 curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
61 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
62 curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
63 if(m_is_transfer_state) {
64 if (m_push) {
65 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
66 curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
67 curl_easy_setopt(curl, CURLOPT_READDATA, this);
68 struct stat buf;
69 if (SFS_OK == m_stream->Stat(&buf)) {
70 m_push_length = buf.st_size;
71 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
72 }
73 } else {
74 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
75 curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
76 }
77 }
78 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
79 if(tpcForwardCreds) {
80 curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
81 }
82
83 // Only use low-speed limits with libcurl v7.38 or later.
84 // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
85 curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
86 if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
87 // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
88 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
89 curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
90 }
91 return true;
92}
93
102 struct curl_slist *list = NULL;
103 for (std::map<std::string, std::string>::const_iterator hdr_iter = req.headers.begin();
104 hdr_iter != req.headers.end();
105 hdr_iter++) {
106 if (!strcasecmp(hdr_iter->first.c_str(),"copy-header")) {
107 list = curl_slist_append(list, hdr_iter->second.c_str());
108 m_headers_copy.emplace_back(hdr_iter->second);
109 }
110 // Note: len("TransferHeader") == 14
111 if (!strncasecmp(hdr_iter->first.c_str(),"transferheader",14)) {
112 std::stringstream ss;
113 ss << hdr_iter->first.substr(14) << ": " << hdr_iter->second;
114 list = curl_slist_append(list, ss.str().c_str());
115 m_headers_copy.emplace_back(ss.str());
116 }
117 }
118
119 if (m_is_transfer_state && m_push && m_push_length > 0) {
120 // On libcurl 8.5.0 - 8.9.1, we've observed bugs causing failures whenever
121 // `Expect: 100-continue` is not used. Older versions of libcurl unconditionally
122 // set `Expect` whenever PUT is used (likely an older bug). To workaround the issue,
123 // we force `Expect` to be set, triggering the older libcurl behavior.
124 // See: https://github.com/xrootd/xrootd/issues/2470
125 // See: https://github.com/curl/curl/issues/17004
126 list = curl_slist_append(list, "Expect: 100-continue");
127 }
128
129 if (list != NULL) {
130 curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
131 m_headers = list;
132 }
133}
134
136 m_offset = 0;
137 m_status_code = -1;
138 m_content_length = -1;
139 m_push_length = -1;
140 m_recv_all_headers = false;
141 m_recv_status_line = false;
142}
143
144size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
145{
146 State *obj = static_cast<State*>(userdata);
147 std::string header(buffer, size*nitems);
148 return obj->Header(header);
149}
150
151int State::Header(const std::string &header) {
152 //printf("Received remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
153 if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
154 m_recv_all_headers = false;
155 m_recv_status_line = false;
156 }
157 if (!m_recv_status_line) {
158 std::stringstream ss(header);
159 std::string item;
160 if (!std::getline(ss, item, ' ')) return 0;
161 m_resp_protocol = item;
162 //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
163 if (!std::getline(ss, item, ' ')) return 0;
164 try {
165 m_status_code = std::stol(item);
166 } catch (...) {
167 return 0;
168 }
169 m_recv_status_line = true;
170 } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
171 m_recv_all_headers = true;
172 }
173 else if (header != "\r\n") {
174 // Parse the header
175 std::size_t found = header.find(":");
176 if (found != std::string::npos) {
177 std::string header_name = header.substr(0, found);
178 std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
179 std::string header_value = header.substr(found+1);
180 if (header_name == "content-length")
181 {
182 try {
183 m_content_length = std::stoll(header_value);
184 } catch (...) {
185 // Header unparseable -- not a great sign, fail request.
186 //printf("Content-length header unparseable\n");
187 return 0;
188 }
189 }
190 } else {
191 // Non-empty header that isn't the status line, but no ':' present --
192 // malformed request?
193 //printf("Malformed header: %s\n", header.c_str());
194 return 0;
195 }
196 }
197 return header.size();
198}
199
200size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
201 State *obj = static_cast<State*>(userdata);
202 if (obj->GetStatusCode() < 0) {
203 return 0;
204 } // malformed request - got body before headers.
205 if (obj->GetStatusCode() >= 400) {
206 obj->m_error_buf += std::string(static_cast<char*>(buffer),
207 std::min(static_cast<size_t>(1024), size*nitems));
208 // Record error messages until we hit a KB; at that point, fail out.
209 if (obj->m_error_buf.size() >= 1024)
210 return 0;
211 else
212 return size*nitems;
213 } // Status indicates failure.
214 return obj->Write(static_cast<char*>(buffer), size*nitems);
215}
216
217ssize_t State::Write(char *buffer, size_t size) {
218 ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
219 if (retval == SFS_ERROR) {
220 m_error_buf = m_stream->GetErrorMessage();
221 m_error_code = 1;
222 return -1;
223 }
224 m_offset += retval;
225 return retval;
226}
227
229 if (m_push) {
230 return 0;
231 }
232
233 ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
234 if (retval == SFS_ERROR) {
235 m_error_buf = m_stream->GetErrorMessage();
236 m_error_code = 2;
237 return -1;
238 }
239 m_offset += retval;
240 return retval;
241}
242
243size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
244 State *obj = static_cast<State*>(userdata);
245 if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
246 if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
247 return obj->Read(static_cast<char*>(buffer), size*nitems);
248}
249
250int State::Read(char *buffer, size_t size) {
251 int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
252 if (retval == SFS_ERROR) {
253 return -1;
254 }
255 m_offset += retval;
256 //printf("Read a total of %ld bytes.\n", m_offset);
257 return retval;
258}
259
261 CURL *curl = curl_easy_duphandle(m_curl);
262 if (!curl) {
263 throw std::runtime_error("Failed to duplicate existing curl handle.");
264 }
265
266 State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
267
268 if (m_headers) {
269 state->m_headers_copy.reserve(m_headers_copy.size());
270 for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
271 header_iter != m_headers_copy.end();
272 header_iter++) {
273 state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
274 state->m_headers_copy.push_back(*header_iter);
275 }
276 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
277 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
278 }
279
280 return state;
281}
282
283void State::SetTransferParameters(off_t offset, size_t size) {
284 m_start_offset = offset;
285 m_offset = 0;
286 m_content_length = size;
287 std::stringstream ss;
288 ss << offset << "-" << (offset+size-1);
289 curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
290}
291
293{
294 return m_stream->AvailableBuffers();
295}
296
298{
299 m_stream->DumpBuffers();
300}
301
303{
304 if (!m_stream->Finalize()) {
305 m_error_buf = m_stream->GetErrorMessage();
306 m_error_code = 3;
307 return false;
308 }
309 return true;
310}
311
313{
314 // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
315 // library versions, simply omit this information.
316#if LIBCURL_VERSION_NUM >= 0x071500
317 char *curl_ip = NULL;
318 CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
319 if ((rc != CURLE_OK) || !curl_ip) {
320 return "";
321 }
322 long curl_port = 0;
323 rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
324 if ((rc != CURLE_OK) || !curl_port) {
325 return "";
326 }
327 std::stringstream ss;
328 // libcurl returns IPv6 addresses of the form:
329 // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
330 // However the HTTP-TPC spec says to use the form
331 // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
332 // Hence, we add '[' and ']' whenever a ':' is seen.
333 if (NULL == strchr(curl_ip, ':'))
334 ss << "tcp:" << curl_ip << ":" << curl_port;
335 else
336 ss << "tcp:[" << curl_ip << "]:" << curl_port;
337 return ss.str();
338#else
339 return "";
340#endif
341}
#define stat(a, b)
Definition XrdPosix.hh:101
#define SFS_ERROR
#define SFS_OK
void CURL
State * Duplicate()
void Move(State &other)
int GetStatusCode() const
void DumpBuffers() const
void ResetAfterRequest()
void SetTransferParameters(off_t offset, size_t size)
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
bool Finalize()
int AvailableBuffers() const
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
bool Finalize()
void DumpBuffers() const
std::string GetErrorMessage() const
size_t AvailableBuffers() const
int Stat(struct stat *)
std::map< std::string, std::string > & headers