XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
31#include "XrdCl/XrdClURL.hh"
32#include "XrdCl/XrdClUtils.hh"
39#include "XrdCl/XrdClSocket.hh"
40#include "XrdCl/XrdClTls.hh"
42
43#include "XrdOuc/XrdOucCRC.hh"
45
46#include "XrdSys/XrdSysPlatform.hh" // same as above
49#include <memory>
50#include <sstream>
51#include <numeric>
52
53namespace
54{
55 //----------------------------------------------------------------------------
56 // We need an extra task what will run the handler in the future, because
57 // tasks get deleted and we need the handler
58 //----------------------------------------------------------------------------
59 class WaitTask: public XrdCl::Task
60 {
61 public:
62 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63 {
64 std::ostringstream o;
65 o << "WaitTask for: 0x" << handler->GetRequest();
66 SetName( o.str() );
67 }
68
69 virtual time_t Run( time_t now )
70 {
71 pHandler->WaitDone( now );
72 return 0;
73 }
74 private:
76 };
77}
78
79namespace XrdCl
80{
81 //----------------------------------------------------------------------------
82 // Delegate the response handling to the thread-pool
83 //----------------------------------------------------------------------------
84 class HandleRspJob: public XrdCl::Job
85 {
86 public:
87 HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88 {
89
90 }
91
92 virtual ~HandleRspJob()
93 {
94
95 }
96
97 virtual void Run( void *arg )
98 {
99 pHandler->HandleResponse();
100 delete this;
101 }
102 private:
103 XrdCl::XRootDMsgHandler *pHandler;
104 };
105
106 //----------------------------------------------------------------------------
107 // Examine an incoming message, and decide on the action to be taken
108 //----------------------------------------------------------------------------
109 uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110 {
111 const int sst = pSendingState.fetch_or( kSawResp );
112
113 if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114 {
115 // we must have been sent although we haven't got the OnStatusReady
116 // notification yet. Set the inflight notice.
117
118 Log *log = DefaultEnv::GetLog();
119 log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120 "that it was sent, assuming it was sent ok.",
121 pUrl.GetHostId().c_str(),
122 pRequest->GetObfuscatedDescription().c_str() );
123
124 pMsgInFly = true;
125 }
126
127 //--------------------------------------------------------------------------
128 // if the MsgHandler is already being used to process another request
129 // (kXR_oksofar) we need to wait
130 //--------------------------------------------------------------------------
131 if( pOksofarAsAnswer )
132 {
133 XrdSysCondVarHelper lck( pCV );
134 while( pResponse ) pCV.Wait();
135 }
136 else
137 {
138 if( pResponse )
139 {
140 Log *log = DefaultEnv::GetLog();
141 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
142 "it already owns a response: %p (message: %s ).",
143 pUrl.GetHostId().c_str(), this,
144 pRequest->GetObfuscatedDescription().c_str() );
145 }
146 }
147
148 if( msg->GetSize() < 8 )
149 return Ignore;
150
151 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
152 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
153 uint16_t status = 0;
154 uint32_t dlen = 0;
155
156 //--------------------------------------------------------------------------
157 // We only care about async responses, but those are extracted now
158 // in the SocketHandler.
159 //--------------------------------------------------------------------------
160 if( rsp->hdr.status == kXR_attn )
161 {
162 return Ignore;
163 }
164 //--------------------------------------------------------------------------
165 // We got a sync message - check if it belongs to us
166 //--------------------------------------------------------------------------
167 else
168 {
169 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
170 rsp->hdr.streamid[1] != req->header.streamid[1] )
171 return Ignore;
172
173 status = rsp->hdr.status;
174 dlen = rsp->hdr.dlen;
175 }
176
177 //--------------------------------------------------------------------------
178 // We take the ownership of the message and decide what we will do
179 // with the handler itself, the options are:
180 // 1) we want to either read in raw mode (the Raw flag) or have the message
181 // body reconstructed for us by the TransportHandler by the time
182 // Process() is called (default, no extra flag)
183 // 2) we either got a full response in which case we don't want to be
184 // notified about anything anymore (RemoveHandler) or we got a partial
185 // answer and we need to wait for more (default, no extra flag)
186 //--------------------------------------------------------------------------
187 pResponse = msg;
188 pBodyReader->SetDataLength( dlen );
189
190 Log *log = DefaultEnv::GetLog();
191 switch( status )
192 {
193 //------------------------------------------------------------------------
194 // Handle the cached cases
195 //------------------------------------------------------------------------
196 case kXR_error:
197 case kXR_redirect:
198 case kXR_wait:
199 return RemoveHandler;
200
201 case kXR_waitresp:
202 {
203 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
204 "message %s", pUrl.GetHostId().c_str(),
205 pRequest->GetObfuscatedDescription().c_str() );
206
207 pResponse.reset();
208 return Ignore; // This must be handled synchronously!
209 }
210
211 //------------------------------------------------------------------------
212 // Handle the potential raw cases
213 //------------------------------------------------------------------------
214 case kXR_ok:
215 {
216 //----------------------------------------------------------------------
217 // For kXR_read we read in raw mode
218 //----------------------------------------------------------------------
219 uint16_t reqId = ntohs( req->header.requestid );
220 if( reqId == kXR_read )
221 {
222 return Raw | RemoveHandler;
223 }
224
225 //----------------------------------------------------------------------
226 // kXR_readv is the same as kXR_read
227 //----------------------------------------------------------------------
228 if( reqId == kXR_readv )
229 {
230 return Raw | RemoveHandler;
231 }
232
233 //----------------------------------------------------------------------
234 // For everything else we just take what we got
235 //----------------------------------------------------------------------
236 return RemoveHandler;
237 }
238
239 //------------------------------------------------------------------------
240 // kXR_oksofars are special, they are not full responses, so we reset
241 // the response pointer to 0 and add the message to the partial list
242 //------------------------------------------------------------------------
243 case kXR_oksofar:
244 {
245 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
246 "%s", pUrl.GetHostId().c_str(),
247 pRequest->GetObfuscatedDescription().c_str() );
248
249 if( !pOksofarAsAnswer )
250 {
251 pPartialResps.emplace_back( std::move( pResponse ) );
252 }
253
254 //----------------------------------------------------------------------
255 // For kXR_read we either read in raw mode if the message has not
256 // been fully reconstructed already, if it has, we adjust
257 // the buffer offset to prepare for the next one
258 //----------------------------------------------------------------------
259 uint16_t reqId = ntohs( req->header.requestid );
260 if( reqId == kXR_read )
261 {
262 pTimeoutFence.store( true, std::memory_order_relaxed );
263 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
264 }
265
266 //----------------------------------------------------------------------
267 // kXR_readv is similar to read, except that the payload is different
268 //----------------------------------------------------------------------
269 if( reqId == kXR_readv )
270 {
271 pTimeoutFence.store( true, std::memory_order_relaxed );
272 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
273 }
274
275 return ( pOksofarAsAnswer ? None : NoProcess );
276 }
277
278 case kXR_status:
279 {
280 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
281 "%s", pUrl.GetHostId().c_str(),
282 pRequest->GetObfuscatedDescription().c_str() );
283
284 uint16_t reqId = ntohs( req->header.requestid );
285 if( reqId == kXR_pgwrite )
286 {
287 //--------------------------------------------------------------------
288 // In case of pgwrite by definition this wont be a partial response
289 // so we can already remove the handler from the in-queue
290 //--------------------------------------------------------------------
291 return RemoveHandler;
292 }
293
294 //----------------------------------------------------------------------
295 // Otherwise (pgread), first of all we need to read the body of the
296 // kXR_status response, we can handle the raw data (if any) only after
297 // we have the whole kXR_status body
298 //----------------------------------------------------------------------
299 pTimeoutFence.store( true, std::memory_order_relaxed );
300 return None;
301 }
302
303 //------------------------------------------------------------------------
304 // Default
305 //------------------------------------------------------------------------
306 default:
307 return RemoveHandler;
308 }
309 return RemoveHandler;
310 }
311
312 //----------------------------------------------------------------------------
313 // Reexamine the incoming message, and decide on the action to be taken
314 //----------------------------------------------------------------------------
316 {
317 if( !pResponse )
318 return 0;
319
320 Log *log = DefaultEnv::GetLog();
321 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
322
323 //--------------------------------------------------------------------------
324 // Additional action is only required for kXR_status
325 //--------------------------------------------------------------------------
326 if( rsp->hdr.status != kXR_status ) return 0;
327
328 //--------------------------------------------------------------------------
329 // Ignore malformed status response
330 //--------------------------------------------------------------------------
331 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
332 {
333 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
334 return Corrupted;
335 }
336
337 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
338 uint16_t reqId = ntohs( req->header.requestid );
339 //--------------------------------------------------------------------------
340 // Unmarshal the status body
341 //--------------------------------------------------------------------------
342 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
343
344 if( !st.IsOK() && st.code == errDataError )
345 {
346 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
347 st.GetErrorMessage().c_str() );
348 return Corrupted;
349 }
350
351 if( !st.IsOK() )
352 {
353 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
354 pUrl.GetHostId().c_str() );
355 pStatus = st;
356 HandleRspOrQueue();
357 return Ignore;
358 }
359
360 //--------------------------------------------------------------------------
361 // Common handling for partial results
362 //--------------------------------------------------------------------------
363 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
365 {
366 pPartialResps.push_back( std::move( pResponse ) );
367 }
368
369 //--------------------------------------------------------------------------
370 // Decide the actions that we need to take
371 //--------------------------------------------------------------------------
372 uint16_t action = 0;
373 if( reqId == kXR_pgread )
374 {
375 //----------------------------------------------------------------------
376 // The message contains only Status header and body but no raw data
377 //----------------------------------------------------------------------
378 if( !pPageReader )
379 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
380 pPageReader->SetRsp( rspst );
381
382 action |= Raw;
383
385 action |= NoProcess;
386 else
387 action |= RemoveHandler;
388 }
389 else if( reqId == kXR_pgwrite )
390 {
391 // if data corruption has been detected on the server side we will
392 // send some additional data pointing to the pages that need to be
393 // retransmitted
394 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
395 pResponse->GetCursor() )
396 action |= More;
397 }
398
399 return action;
400 }
401
402 //----------------------------------------------------------------------------
403 // Get handler sid
404 //----------------------------------------------------------------------------
406 {
407 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
408 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
409 }
410
411 //----------------------------------------------------------------------------
413 //----------------------------------------------------------------------------
415 {
416 Log *log = DefaultEnv::GetLog();
417
418 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
419
420 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
421
422 //--------------------------------------------------------------------------
423 // If it is a local file, it can be only a metalink redirector
424 //--------------------------------------------------------------------------
425 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
426 pHosts->back().protocol = kXR_PROTOCOLVERSION;
427
428 //--------------------------------------------------------------------------
429 // We got an answer, check who we were talking to
430 //--------------------------------------------------------------------------
431 else
432 {
433 AnyObject qryResult;
434 int *qryResponse = nullptr;
435 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
436 qryResult.Get( qryResponse );
437 if (qryResponse) {
438 pHosts->back().flags = *qryResponse;
439 delete qryResponse;
440 qryResponse = nullptr;
441 }
442 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
443 qryResult.Get( qryResponse );
444 if (qryResponse) {
445 pHosts->back().protocol = *qryResponse;
446 delete qryResponse;
447 }
448 }
449
450 //--------------------------------------------------------------------------
451 // Process the message
452 //--------------------------------------------------------------------------
453 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
454 if( !st.IsOK() )
455 {
456 pStatus = Status( stFatal, errInvalidMessage );
457 HandleResponse();
458 return;
459 }
460
461 //--------------------------------------------------------------------------
462 // we have an response for the message so it's not in fly anymore
463 //--------------------------------------------------------------------------
464 pMsgInFly = false;
465
466 //--------------------------------------------------------------------------
467 // Reset the aggregated wait (used to omit wait response in case of Metalink
468 // redirector)
469 //--------------------------------------------------------------------------
470 if( rsp->hdr.status != kXR_wait )
471 pAggregatedWaitTime = 0;
472
473 switch( rsp->hdr.status )
474 {
475 //------------------------------------------------------------------------
476 // kXR_ok - we're done here
477 //------------------------------------------------------------------------
478 case kXR_ok:
479 {
480 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
481 pUrl.GetHostId().c_str(),
482 pRequest->GetObfuscatedDescription().c_str() );
483 pStatus = Status();
484 HandleResponse();
485 return;
486 }
487
488 case kXR_status:
489 {
490 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
491 pUrl.GetHostId().c_str(),
492 pRequest->GetObfuscatedDescription().c_str() );
493 pStatus = Status();
494 HandleResponse();
495 return;
496 }
497
498 //------------------------------------------------------------------------
499 // kXR_ok - we're serving partial result to the user
500 //------------------------------------------------------------------------
501 case kXR_oksofar:
502 {
503 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
504 pUrl.GetHostId().c_str(),
505 pRequest->GetObfuscatedDescription().c_str() );
506 pStatus = Status( stOK, suContinue );
507 HandleResponse();
508 return;
509 }
510
511 //------------------------------------------------------------------------
512 // kXR_error - we've got a problem
513 //------------------------------------------------------------------------
514 case kXR_error:
515 {
516 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
517 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
518 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
519 "[%d] %s", pUrl.GetHostId().c_str(),
520 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
521 errmsg );
522 delete [] errmsg;
523
524 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
525 return;
526 }
527
528 //------------------------------------------------------------------------
529 // kXR_redirect - they tell us to go elsewhere
530 //------------------------------------------------------------------------
531 case kXR_redirect:
532 {
533 if( rsp->hdr.dlen <= 4 )
534 {
535 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
536 pUrl.GetHostId().c_str() );
537 pStatus = Status( stError, errInvalidResponse );
538 HandleResponse();
539 return;
540 }
541
542 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
543 urlInfoBuff[rsp->hdr.dlen-4] = 0;
544 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
545 std::string urlInfo = urlInfoBuff;
546 delete [] urlInfoBuff;
547 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
548 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
549 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
550 rsp->body.redirect.port );
551
552 //----------------------------------------------------------------------
553 // Check if we can proceed
554 //----------------------------------------------------------------------
555 if( !pRedirectCounter )
556 {
557 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
558 "message %s, the last known error is: %s",
559 pUrl.GetHostId().c_str(),
560 pRequest->GetObfuscatedDescription().c_str(),
561 pLastError.ToString().c_str() );
562
563
564 pStatus = Status( stFatal, errRedirectLimit );
565 HandleResponse();
566 return;
567 }
568 --pRedirectCounter;
569
570 //----------------------------------------------------------------------
571 // Keep the info about this server if we still need to find a load
572 // balancer
573 //----------------------------------------------------------------------
574 uint32_t flags = pHosts->back().flags;
575 if( !pHasLoadBalancer )
576 {
577 if( flags & kXR_isManager )
578 {
579 //------------------------------------------------------------------
580 // If the current server is a meta manager then it supersedes
581 // any existing load balancer, otherwise we assign a load-balancer
582 // only if it has not been already assigned
583 //------------------------------------------------------------------
584 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
585 {
586 pLoadBalancer = pHosts->back();
587 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
588 "as a load-balancer for message %s",
589 pUrl.GetHostId().c_str(),
590 pRequest->GetObfuscatedDescription().c_str() );
591 HostList::iterator it;
592 for( it = pHosts->begin(); it != pHosts->end(); ++it )
593 it->loadBalancer = false;
594 pHosts->back().loadBalancer = true;
595 }
596 }
597 }
598
599 //----------------------------------------------------------------------
600 // If the redirect comes from a data server safe the URL because
601 // in case of a failure we will use it as the effective data server URL
602 // for the tried CGI opaque info
603 //----------------------------------------------------------------------
604 if( flags & kXR_isServer )
605 pEffectiveDataServerUrl = new URL( pHosts->back().url );
606
607 //----------------------------------------------------------------------
608 // Build the URL and check it's validity
609 //----------------------------------------------------------------------
610 std::vector<std::string> urlComponents;
611 std::string newCgi;
612 Utils::splitString( urlComponents, urlInfo, "?" );
613
614 std::ostringstream o;
615
616 o << urlComponents[0];
617 if( rsp->body.redirect.port > 0 )
618 o << ":" << rsp->body.redirect.port << "/";
619 else if( rsp->body.redirect.port < 0 )
620 {
621 //--------------------------------------------------------------------
622 // check if the manager wants to enforce write recovery at himself
623 // (beware we are dealing here with negative flags)
624 //--------------------------------------------------------------------
625 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
626 pHosts->back().flags |= kXR_recoverWrts;
627
628 //--------------------------------------------------------------------
629 // check if the manager wants to collapse the communication channel
630 // (the redirect host is to replace the current host)
631 //--------------------------------------------------------------------
632 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
633 {
634 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
635 pPostMaster->CollapseRedirect( pUrl, url );
636 }
637
638 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
639 {
640 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
641 if( Utils::CheckEC( pRequest, url ) )
642 pRedirectAsAnswer = true;
643 }
644 }
645
646 URL newUrl = URL( o.str() );
647 if( !newUrl.IsValid() )
648 {
650 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
651 pUrl.GetHostId().c_str(), urlInfo.c_str() );
652 HandleResponse();
653 return;
654 }
655
656 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
657 newUrl.SetUserName( pUrl.GetUserName() );
658
659 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
660 newUrl.SetPassword( pUrl.GetPassword() );
661
662 //----------------------------------------------------------------------
663 // Forward any "xrd.*" params from the original client request also to
664 // the new redirection url
665 // Also, we need to preserve any "xrdcl.*' as they are important for
666 // our internal workflows.
667 //----------------------------------------------------------------------
668 std::ostringstream ossXrd;
669 const URL::ParamsMap &urlParams = pUrl.GetParams();
670
671 for(URL::ParamsMap::const_iterator it = urlParams.begin();
672 it != urlParams.end(); ++it )
673 {
674 if( it->first.compare( 0, 4, "xrd." ) &&
675 it->first.compare( 0, 6, "xrdcl." ) )
676 continue;
677
678 ossXrd << it->first << '=' << it->second << '&';
679 }
680
681 std::string xrdCgi = ossXrd.str();
682 pRedirectUrl = newUrl.GetURL();
683
684 URL cgiURL;
685 if( urlComponents.size() > 1 )
686 {
687 pRedirectUrl += "?";
688 pRedirectUrl += urlComponents[1];
689 std::ostringstream o;
690 o << "fake://fake:111//fake?";
691 o << urlComponents[1];
692
693 if( urlComponents.size() == 3 )
694 o << '?' << urlComponents[2];
695
696 if (!xrdCgi.empty())
697 {
698 o << '&' << xrdCgi;
699 pRedirectUrl += '&';
700 pRedirectUrl += xrdCgi;
701 }
702
703 cgiURL = URL( o.str() );
704 }
705 else {
706 if (!xrdCgi.empty())
707 {
708 std::ostringstream o;
709 o << "fake://fake:111//fake?";
710 o << xrdCgi;
711 cgiURL = URL( o.str() );
712 pRedirectUrl += '?';
713 pRedirectUrl += xrdCgi;
714 }
715 }
716
717 //----------------------------------------------------------------------
718 // Check if we need to return the URL as a response
719 //----------------------------------------------------------------------
720 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
721 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
722 !newUrl.IsLocalFile() )
723 pRedirectAsAnswer = true;
724
725 if( pRedirectAsAnswer )
726 {
727 pStatus = Status( stError, errRedirect );
728 HandleResponse();
729 return;
730 }
731
732 //----------------------------------------------------------------------
733 // Rewrite the message in a way required to send it to another server
734 //----------------------------------------------------------------------
735 newUrl.SetParams( cgiURL.GetParams() );
736 Status st = RewriteRequestRedirect( newUrl );
737 if( !st.IsOK() )
738 {
739 pStatus = st;
740 HandleResponse();
741 return;
742 }
743
744 //----------------------------------------------------------------------
745 // Make sure we don't change the protocol by accident (root vs roots)
746 //----------------------------------------------------------------------
747 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
748 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
749 newUrl.SetProtocol( "roots" );
750
751 //----------------------------------------------------------------------
752 // Send the request to the new location
753 //----------------------------------------------------------------------
754 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
755 return;
756 }
757
758 //------------------------------------------------------------------------
759 // kXR_wait - we wait, and re-issue the request later
760 //------------------------------------------------------------------------
761 case kXR_wait:
762 {
763 uint32_t waitSeconds = 0;
764
765 if( rsp->hdr.dlen >= 4 )
766 {
767 char *infoMsg = new char[rsp->hdr.dlen-3];
768 infoMsg[rsp->hdr.dlen-4] = 0;
769 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
770 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
771 "message %s: %s", pUrl.GetHostId().c_str(),
772 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
773 infoMsg );
774 delete [] infoMsg;
775 waitSeconds = rsp->body.wait.seconds;
776 }
777 else
778 {
779 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
780 "message %s", pUrl.GetHostId().c_str(),
781 pRequest->GetObfuscatedDescription().c_str() );
782 }
783
784 pAggregatedWaitTime += waitSeconds;
785
786 // We need a special case if the data node comes from metalink
787 // redirector. In this case it might make more sense to try the
788 // next entry in the Metalink than wait.
789 if( OmitWait( *pRequest, pLoadBalancer.url ) )
790 {
791 int maxWait = DefaultMaxMetalinkWait;
792 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
793 if( pAggregatedWaitTime > maxWait )
794 {
795 UpdateTriedCGI();
796 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
797 return;
798 }
799 }
800
801 //----------------------------------------------------------------------
802 // Some messages require rewriting before they can be sent again
803 // after wait
804 //----------------------------------------------------------------------
805 Status st = RewriteRequestWait();
806 if( !st.IsOK() )
807 {
808 pStatus = st;
809 HandleResponse();
810 return;
811 }
812
813 //----------------------------------------------------------------------
814 // Register a task to resend the message in some seconds, if we still
815 // have time to do that, and report a timeout otherwise
816 //----------------------------------------------------------------------
817 time_t resendTime = ::time(0)+waitSeconds;
818
819 if( resendTime < pExpiration )
820 {
821 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
822 pUrl.GetHostId().c_str(), this,
823 pRequest->GetObfuscatedDescription().c_str() );
824
825 TaskManager *taskMgr = pPostMaster->GetTaskManager();
826 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
827 }
828 else
829 {
830 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
831 pUrl.GetHostId().c_str(),
832 pRequest->GetObfuscatedDescription().c_str() );
833 HandleError( Status( stError, errOperationExpired) );
834 }
835 return;
836 }
837
838 //------------------------------------------------------------------------
839 // kXR_waitresp - the response will be returned in some seconds as an
840 // unsolicited message. Currently all messages of this type are handled
841 // one step before in the XrdClStream::OnIncoming as they need to be
842 // processed synchronously.
843 //------------------------------------------------------------------------
844 case kXR_waitresp:
845 {
846 if( rsp->hdr.dlen < 4 )
847 {
848 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
849 pUrl.GetHostId().c_str() );
850 pStatus = Status( stError, errInvalidResponse );
851 HandleResponse();
852 return;
853 }
854
855 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
856 "message %s", pUrl.GetHostId().c_str(),
857 rsp->body.waitresp.seconds,
858 pRequest->GetObfuscatedDescription().c_str() );
859 return;
860 }
861
862 //------------------------------------------------------------------------
863 // Default - unrecognized/unsupported response, declare an error
864 //------------------------------------------------------------------------
865 default:
866 {
867 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
868 "message %s", pUrl.GetHostId().c_str(),
869 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
870 pStatus = Status( stError, errInvalidResponse );
871 HandleResponse();
872 return;
873 }
874 }
875
876 return;
877 }
878
879 //----------------------------------------------------------------------------
880 // Handle an event other that a message arrival - may be timeout
881 //----------------------------------------------------------------------------
883 XRootDStatus status )
884 {
885 Log *log = DefaultEnv::GetLog();
886 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
887 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
888
889 if( event == Ready )
890 return 0;
891
892 if( pTimeoutFence.load( std::memory_order_relaxed ) )
893 return 0;
894
895 HandleError( status );
896 return RemoveHandler;
897 }
898
899 //----------------------------------------------------------------------------
900 // Read message body directly from a socket
901 //----------------------------------------------------------------------------
903 Socket *socket,
904 uint32_t &bytesRead )
905 {
906 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
907 uint16_t reqId = ntohs( req->header.requestid );
908
909 if( reqId == kXR_pgread )
910 return pPageReader->Read( *socket, bytesRead );
911
912 return pBodyReader->Read( *socket, bytesRead );
913 }
914
915 //----------------------------------------------------------------------------
916 // We're here when we requested sending something over the wire
917 // or other status update on this action.
918 // We can be called when message is still in out-queue, with an
919 // error status indicating message will not be sent.
920 //----------------------------------------------------------------------------
922 XRootDStatus status )
923 {
924 Log *log = DefaultEnv::GetLog();
925
926 const int sst = pSendingState.fetch_or( kSendDone );
927
928 // if we have already seen a response we can not be in the out-queue
929 // anymore, so we should be getting notified of a successful send.
930 // If not log and do our best to recover.
931 if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
932 {
933 log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
934 "recover.", pUrl.GetHostId().c_str(),
935 message->GetObfuscatedDescription().c_str() );
936 HandleError( status );
937 return;
938 }
939
940 if( sst & kFinalResp )
941 {
942 log->Dump( XRootDMsg, "[%s] Got late notification that outgoing message %s was "
943 "sent, already have final response, queuing handler callback.",
944 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
945 HandleRspOrQueue();
946 return;
947 }
948
949 if( sst & kSawResp )
950 {
951 log->Dump( XRootDMsg, "[%s] Got late notification that message %s has "
952 "been successfully sent.",
953 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
954 return;
955 }
956
957 //--------------------------------------------------------------------------
958 // We were successful, so we now need to listen for a response
959 //--------------------------------------------------------------------------
960 if( status.IsOK() )
961 {
962 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
963 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
964
965 pMsgInFly = true;
966 return;
967 }
968
969 //--------------------------------------------------------------------------
970 // We have failed, recover if possible
971 //--------------------------------------------------------------------------
972 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
973 "recover.", pUrl.GetHostId().c_str(),
974 message->GetObfuscatedDescription().c_str() );
975 HandleError( status );
976 }
977
978 //----------------------------------------------------------------------------
979 // Are we a raw writer or not?
980 //----------------------------------------------------------------------------
982 {
983 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
984 uint16_t reqId = ntohs( req->header.requestid );
985 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
986 return true;
987 // checkpoint + execute
988 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
989 {
990 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
991 reqId = ntohs( xeq->header.requestid );
992 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
993 }
994
995 return false;
996 }
997
998 //----------------------------------------------------------------------------
999 // Write the message body
1000 //----------------------------------------------------------------------------
1002 uint32_t &bytesWritten )
1003 {
1004 //--------------------------------------------------------------------------
1005 // First check if it is a PgWrite
1006 //--------------------------------------------------------------------------
1007 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1008 {
1009 //------------------------------------------------------------------------
1010 // PgWrite will have just one chunk
1011 //------------------------------------------------------------------------
1012 ChunkInfo chunk = pChunkList->front();
1013 //------------------------------------------------------------------------
1014 // Calculate the size of the first and last page (in case the chunk is not
1015 // 4KB aligned)
1016 //------------------------------------------------------------------------
1017 int fLen = 0, lLen = 0;
1018 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1019
1020 //------------------------------------------------------------------------
1021 // Set the crc32c buffer if not ready yet
1022 //------------------------------------------------------------------------
1023 if( pPgWrtCksumBuff.GetCursor() == 0 )
1024 {
1025 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1026 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1027 }
1028
1029 uint32_t btsLeft = chunk.length - pAsyncOffset;
1030 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1031 if( pglen > btsLeft ) pglen = btsLeft;
1032 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1033
1034 while( btsLeft > 0 )
1035 {
1036 // first write the crc32c digest
1037 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1038 {
1039 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1040 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1041 int btswrt = 0;
1042 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1043 if( !st.IsOK() ) return st;
1044 bytesWritten += btswrt;
1045 pPgWrtCksumBuff.AdvanceCursor( btswrt );
1046 if( st.code == suRetry ) return st;
1047 }
1048 // then write the raw data (one page)
1049 int btswrt = 0;
1050 Status st = socket->Send( pgbuf, pglen, btswrt );
1051 if( !st.IsOK() ) return st;
1052 pgbuf += btswrt;
1053 pglen -= btswrt;
1054 btsLeft -= btswrt;
1055 bytesWritten += btswrt;
1056 pAsyncOffset += btswrt; // update the offset to the raw data
1057 if( st.code == suRetry ) return st;
1058 // if we managed to write all the data ...
1059 if( pglen == 0 )
1060 {
1061 // move to the next page
1062 ++pPgWrtCurrentPageNb;
1063 if( pPgWrtCurrentPageNb < nbpgs )
1064 {
1065 // set the digest buffer
1066 pPgWrtCksumBuff.SetCursor( 0 );
1067 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1068 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1069 }
1070 // set the page length
1071 pglen = XrdSys::PageSize;
1072 if( pglen > btsLeft ) pglen = btsLeft;
1073 // reset offset in the current page
1074 pPgWrtCurrentPageOffset = 0;
1075 }
1076 else
1077 // otherwise just adjust the offset in the current page
1078 pPgWrtCurrentPageOffset += btswrt;
1079
1080 }
1081 }
1082 else if( !pChunkList->empty() )
1083 {
1084 size_t size = pChunkList->size();
1085 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1086 {
1087 char *buffer = (char*)(*pChunkList)[i].buffer;
1088 uint32_t size = (*pChunkList)[i].length;
1089 size_t leftToBeWritten = size - pAsyncOffset;
1090
1091 while( leftToBeWritten )
1092 {
1093 int btswrt = 0;
1094 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1095 bytesWritten += btswrt;
1096 if( !st.IsOK() || st.code == suRetry ) return st;
1097 pAsyncOffset += btswrt;
1098 leftToBeWritten -= btswrt;
1099 }
1100 //----------------------------------------------------------------------
1101 // Remember that we have moved to the next chunk, also clear the offset
1102 // within the buffer as we are going to move to a new one
1103 //----------------------------------------------------------------------
1104 ++pAsyncChunkIndex;
1105 pAsyncOffset = 0;
1106 }
1107 }
1108 else
1109 {
1110 Log *log = DefaultEnv::GetLog();
1111
1112 //------------------------------------------------------------------------
1113 // If the socket is encrypted we cannot use a kernel buffer, we have to
1114 // convert to user space buffer
1115 //------------------------------------------------------------------------
1116 if( socket->IsEncrypted() )
1117 {
1118 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1119 pUrl.GetHostId().c_str() );
1120
1121 char *ubuff = 0;
1122 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1123 if( ret < 0 ) return Status( stError, errInternal );
1124 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1125 return WriteMessageBody( socket, bytesWritten );
1126 }
1127
1128 //------------------------------------------------------------------------
1129 // Send the data
1130 //------------------------------------------------------------------------
1131 while( !pKBuff->Empty() )
1132 {
1133 int btswrt = 0;
1134 Status st = socket->Send( *pKBuff, btswrt );
1135 bytesWritten += btswrt;
1136 if( !st.IsOK() || st.code == suRetry ) return st;
1137 }
1138
1139 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1140 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1141 }
1142
1143 return Status();
1144 }
1145
1146 //----------------------------------------------------------------------------
1147 // We're here when we got a time event. We needed to re-issue the request
1148 // in some time in the future, and that moment has arrived
1149 //----------------------------------------------------------------------------
1151 {
1152 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1153 }
1154
1155 //----------------------------------------------------------------------------
1156 // Bookkeeping after partial response has been received.
1157 //----------------------------------------------------------------------------
1159 {
1160 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1161 }
1162
1163 //----------------------------------------------------------------------------
1164 // Unpack the message and call the response handler
1165 //----------------------------------------------------------------------------
1166 void XRootDMsgHandler::HandleResponse()
1167 {
1168 //--------------------------------------------------------------------------
1169 // Is it a final response?
1170 //--------------------------------------------------------------------------
1171 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1172 if( finalrsp )
1173 {
1174 // Do not do final processing of the response if we haven't had
1175 // confirmation the original request was sent (via OnStatusReady).
1176 // The final processing will be triggered when we get the confirm.
1177 const int sst = pSendingState.fetch_or( kFinalResp );
1178 if( !( sst & kSendDone ) )
1179 return;
1180 }
1181
1182 //--------------------------------------------------------------------------
1183 // Process the response and notify the listener
1184 //--------------------------------------------------------------------------
1186 XRootDStatus *status = ProcessStatus();
1187 AnyObject *response = 0;
1188
1189 Log *log = DefaultEnv::GetLog();
1190 log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1191 "with status: %s.",
1192 pUrl.GetHostId().c_str(), this,
1193 pRequest->GetObfuscatedDescription().c_str(),
1194 status->ToString().c_str() );
1195
1196 if( status->IsOK() )
1197 {
1198 Status st = ParseResponse( response );
1199 if( !st.IsOK() )
1200 {
1201 delete status;
1202 delete response;
1203 status = new XRootDStatus( st );
1204 response = 0;
1205 }
1206 }
1207
1208 //--------------------------------------------------------------------------
1209 // Close the redirect entry if necessary
1210 //--------------------------------------------------------------------------
1211 if( pRdirEntry )
1212 {
1213 pRdirEntry->status = *status;
1214 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1215 }
1216
1217 //--------------------------------------------------------------------------
1218 // Release the stream id
1219 //--------------------------------------------------------------------------
1220 if( pSidMgr && finalrsp )
1221 {
1222 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1223 if( status->IsOK() || !pMsgInFly ||
1224 !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1225 pSidMgr->ReleaseSID( req->header.streamid );
1226 }
1227
1228 HostList *hosts = pHosts.release();
1229 if( !finalrsp )
1230 pHosts.reset( new HostList( *hosts ) );
1231
1232 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1233
1234 //--------------------------------------------------------------------------
1235 // if it is the final response there is nothing more to do ...
1236 //--------------------------------------------------------------------------
1237 if( finalrsp )
1238 delete this;
1239 //--------------------------------------------------------------------------
1240 // on the other hand if it is not the final response, we have to keep the
1241 // MsgHandler and delete the current response
1242 //--------------------------------------------------------------------------
1243 else
1244 {
1245 XrdSysCondVarHelper lck( pCV );
1246 pResponse.reset();
1247 pTimeoutFence.store( false, std::memory_order_relaxed );
1248 pCV.Broadcast();
1249 }
1250 }
1251
1252
1253 //----------------------------------------------------------------------------
1254 // Extract the status information from the stuff that we got
1255 //----------------------------------------------------------------------------
1256 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1257 {
1258 XRootDStatus *st = new XRootDStatus( pStatus );
1259 ServerResponse *rsp = 0;
1260 if( pResponse )
1261 rsp = (ServerResponse *)pResponse->GetBuffer();
1262
1263 if( !pStatus.IsOK() && rsp )
1264 {
1265 if( pStatus.code == errErrorResponse )
1266 {
1267 st->errNo = rsp->body.error.errnum;
1268 // omit the last character as the string returned from the server
1269 // (acording to protocol specs) should be null-terminated
1270 std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1271 if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1272 errmsg += " Last seen error: " + pLastError.ToString();
1273 st->SetErrorMessage( errmsg );
1274 }
1275 else if( pStatus.code == errRedirect )
1276 st->SetErrorMessage( pRedirectUrl );
1277 }
1278 return st;
1279 }
1280
1281 //------------------------------------------------------------------------
1282 // Parse the response and put it in an object that could be passed to
1283 // the user
1284 //------------------------------------------------------------------------
1285 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1286 {
1287 if( !pResponse )
1288 return Status();
1289
1290 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1291 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1292 Log *log = DefaultEnv::GetLog();
1293
1294 //--------------------------------------------------------------------------
1295 // Handle redirect as an answer
1296 //--------------------------------------------------------------------------
1297 if( rsp->hdr.status == kXR_redirect )
1298 {
1299 log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1300 return 0;
1301 }
1302
1303 Buffer buff;
1304 uint32_t length = 0;
1305 char *buffer = 0;
1306
1307 //--------------------------------------------------------------------------
1308 // We don't have any partial answers so pass what we have
1309 //--------------------------------------------------------------------------
1310 if( pPartialResps.empty() )
1311 {
1312 buffer = rsp->body.buffer.data;
1313 length = rsp->hdr.dlen;
1314 }
1315 //--------------------------------------------------------------------------
1316 // Partial answers, we need to glue them together before parsing
1317 //--------------------------------------------------------------------------
1318 else if( req->header.requestid != kXR_read &&
1319 req->header.requestid != kXR_readv )
1320 {
1321 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1322 {
1323 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1324 length += part->hdr.dlen;
1325 }
1326 length += rsp->hdr.dlen;
1327
1328 buff.Allocate( length );
1329 uint32_t offset = 0;
1330 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1331 {
1332 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1333 buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1334 offset += part->hdr.dlen;
1335 }
1336 buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1337 buffer = buff.GetBuffer();
1338 }
1339
1340 //--------------------------------------------------------------------------
1341 // Right, but what was the question?
1342 //--------------------------------------------------------------------------
1343 switch( req->header.requestid )
1344 {
1345 //------------------------------------------------------------------------
1346 // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1347 // kXR_ping, kXR_close, kXR_write, kXR_sync
1348 //------------------------------------------------------------------------
1349 case kXR_mv:
1350 case kXR_truncate:
1351 case kXR_rm:
1352 case kXR_mkdir:
1353 case kXR_rmdir:
1354 case kXR_chmod:
1355 case kXR_ping:
1356 case kXR_close:
1357 case kXR_write:
1358 case kXR_writev:
1359 case kXR_sync:
1360 case kXR_chkpoint:
1361 return Status();
1362
1363 //------------------------------------------------------------------------
1364 // kXR_locate
1365 //------------------------------------------------------------------------
1366 case kXR_locate:
1367 {
1368 AnyObject *obj = new AnyObject();
1369
1370 char *nullBuffer = new char[length+1];
1371 nullBuffer[length] = 0;
1372 memcpy( nullBuffer, buffer, length );
1373
1374 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1375 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1376 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1377 LocationInfo *data = new LocationInfo();
1378
1379 if( data->ParseServerResponse( nullBuffer ) == false )
1380 {
1381 delete obj;
1382 delete data;
1383 delete [] nullBuffer;
1384 return Status( stError, errInvalidResponse );
1385 }
1386 delete [] nullBuffer;
1387
1388 obj->Set( data );
1389 response = obj;
1390 return Status();
1391 }
1392
1393 //------------------------------------------------------------------------
1394 // kXR_stat
1395 //------------------------------------------------------------------------
1396 case kXR_stat:
1397 {
1398 AnyObject *obj = new AnyObject();
1399
1400 //----------------------------------------------------------------------
1401 // Virtual File System stat (kXR_vfs)
1402 //----------------------------------------------------------------------
1403 if( req->stat.options & kXR_vfs )
1404 {
1405 StatInfoVFS *data = new StatInfoVFS();
1406
1407 char *nullBuffer = new char[length+1];
1408 nullBuffer[length] = 0;
1409 memcpy( nullBuffer, buffer, length );
1410
1411 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1412 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1413 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1414
1415 if( data->ParseServerResponse( nullBuffer ) == false )
1416 {
1417 delete obj;
1418 delete data;
1419 delete [] nullBuffer;
1420 return Status( stError, errInvalidResponse );
1421 }
1422 delete [] nullBuffer;
1423
1424 obj->Set( data );
1425 }
1426 //----------------------------------------------------------------------
1427 // Normal stat
1428 //----------------------------------------------------------------------
1429 else
1430 {
1431 StatInfo *data = new StatInfo();
1432
1433 char *nullBuffer = new char[length+1];
1434 nullBuffer[length] = 0;
1435 memcpy( nullBuffer, buffer, length );
1436
1437 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1438 "%s", pUrl.GetHostId().c_str(),
1439 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1440
1441 if( data->ParseServerResponse( nullBuffer ) == false )
1442 {
1443 delete obj;
1444 delete data;
1445 delete [] nullBuffer;
1446 return Status( stError, errInvalidResponse );
1447 }
1448 delete [] nullBuffer;
1449 obj->Set( data );
1450 }
1451
1452 response = obj;
1453 return Status();
1454 }
1455
1456 //------------------------------------------------------------------------
1457 // kXR_protocol
1458 //------------------------------------------------------------------------
1459 case kXR_protocol:
1460 {
1461 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1462 pUrl.GetHostId().c_str(),
1463 pRequest->GetObfuscatedDescription().c_str() );
1464
1465 if( rsp->hdr.dlen < 8 )
1466 {
1467 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1468 pUrl.GetHostId().c_str() );
1469 return Status( stError, errInvalidResponse );
1470 }
1471
1472 AnyObject *obj = new AnyObject();
1473 ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1474 rsp->body.protocol.flags );
1475 obj->Set( data );
1476 response = obj;
1477 return Status();
1478 }
1479
1480 //------------------------------------------------------------------------
1481 // kXR_dirlist
1482 //------------------------------------------------------------------------
1483 case kXR_dirlist:
1484 {
1485 AnyObject *obj = new AnyObject();
1486 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1487 "DirectoryList", pUrl.GetHostId().c_str(),
1488 pRequest->GetObfuscatedDescription().c_str() );
1489
1490 char *path = new char[req->dirlist.dlen+1];
1491 path[req->dirlist.dlen] = 0;
1492 memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1493
1494 DirectoryList *data = new DirectoryList();
1495 data->SetParentName( path );
1496 delete [] path;
1497
1498 char *nullBuffer = new char[length+1];
1499 nullBuffer[length] = 0;
1500 memcpy( nullBuffer, buffer, length );
1501
1502 bool invalidrsp = false;
1503
1504 if( !pDirListStarted )
1505 {
1506 pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1507 pDirListStarted = true;
1508
1509 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1510 }
1511 else
1512 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1513
1514 if( invalidrsp )
1515 {
1516 delete data;
1517 delete obj;
1518 delete [] nullBuffer;
1519 return Status( stError, errInvalidResponse );
1520 }
1521
1522 delete [] nullBuffer;
1523 obj->Set( data );
1524 response = obj;
1525 return Status();
1526 }
1527
1528 //------------------------------------------------------------------------
1529 // kXR_open - if we got the statistics, otherwise return 0
1530 //------------------------------------------------------------------------
1531 case kXR_open:
1532 {
1533 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1534 pUrl.GetHostId().c_str(),
1535 pRequest->GetObfuscatedDescription().c_str() );
1536
1537 if( rsp->hdr.dlen < 4 )
1538 {
1539 log->Error( XRootDMsg, "[%s] Got invalid open response.",
1540 pUrl.GetHostId().c_str() );
1541 return Status( stError, errInvalidResponse );
1542 }
1543
1544 AnyObject *obj = new AnyObject();
1545 StatInfo *statInfo = 0;
1546
1547 //----------------------------------------------------------------------
1548 // Handle StatInfo if requested
1549 //----------------------------------------------------------------------
1550 if( req->open.options & kXR_retstat )
1551 {
1552 log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1553 pUrl.GetHostId().c_str(),
1554 pRequest->GetObfuscatedDescription().c_str() );
1555
1556 if( rsp->hdr.dlen >= 12 )
1557 {
1558 char *nullBuffer = new char[rsp->hdr.dlen-11];
1559 nullBuffer[rsp->hdr.dlen-12] = 0;
1560 memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1561
1562 statInfo = new StatInfo();
1563 if( statInfo->ParseServerResponse( nullBuffer ) == false )
1564 {
1565 delete statInfo;
1566 statInfo = 0;
1567 }
1568 delete [] nullBuffer;
1569 }
1570
1571 if( rsp->hdr.dlen < 12 || !statInfo )
1572 {
1573 log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1574 "to %s", pUrl.GetHostId().c_str(),
1575 pRequest->GetObfuscatedDescription().c_str() );
1576 delete obj;
1577 return Status( stError, errInvalidResponse );
1578 }
1579 }
1580
1581 OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1582 pResponse->GetSessionId(),
1583 statInfo );
1584 obj->Set( data );
1585 response = obj;
1586 return Status();
1587 }
1588
1589 //------------------------------------------------------------------------
1590 // kXR_read
1591 //------------------------------------------------------------------------
1592 case kXR_read:
1593 {
1594 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1595 pUrl.GetHostId().c_str(),
1596 pRequest->GetObfuscatedDescription().c_str() );
1597
1598 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1599 {
1600 //--------------------------------------------------------------------
1601 // we are expecting to have only the header in the message, the raw
1602 // data have been readout into the user buffer
1603 //--------------------------------------------------------------------
1604 if( pPartialResps[i]->GetSize() > 8 )
1605 return Status( stOK, errInternal );
1606 }
1607 //----------------------------------------------------------------------
1608 // we are expecting to have only the header in the message, the raw
1609 // data have been readout into the user buffer
1610 //----------------------------------------------------------------------
1611 if( pResponse->GetSize() > 8 )
1612 return Status( stOK, errInternal );
1613 //----------------------------------------------------------------------
1614 // Get the response for the end user
1615 //----------------------------------------------------------------------
1616 return pBodyReader->GetResponse( response );
1617 }
1618
1619 //------------------------------------------------------------------------
1620 // kXR_pgread
1621 //------------------------------------------------------------------------
1622 case kXR_pgread:
1623 {
1624 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1625 pUrl.GetHostId().c_str(),
1626 pRequest->GetObfuscatedDescription().c_str() );
1627
1628 //----------------------------------------------------------------------
1629 // Glue in the cached responses if necessary
1630 //----------------------------------------------------------------------
1631 ChunkInfo chunk = pChunkList->front();
1632 bool sizeMismatch = false;
1633 uint32_t currentOffset = 0;
1634 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1635 {
1636 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1637
1638 //--------------------------------------------------------------------
1639 // the actual size of the raw data without the crc32c checksums
1640 //--------------------------------------------------------------------
1641 size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1642 part->status.bdy.dlen ) * CksumSize;
1643
1644 if( currentOffset + datalen > chunk.length )
1645 {
1646 sizeMismatch = true;
1647 break;
1648 }
1649
1650 currentOffset += datalen;
1651 }
1652
1653 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1654 size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1655 rspst->status.bdy.dlen ) * CksumSize;
1656 if( currentOffset + datalen <= chunk.length )
1657 currentOffset += datalen;
1658 else
1659 sizeMismatch = true;
1660
1661 //----------------------------------------------------------------------
1662 // Overflow
1663 //----------------------------------------------------------------------
1664 if( pChunkStatus.front().sizeError || sizeMismatch )
1665 {
1666 log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1667 "buffer is too small for the received data.",
1668 pUrl.GetHostId().c_str(),
1669 pRequest->GetObfuscatedDescription().c_str() );
1670 return Status( stError, errInvalidResponse );
1671 }
1672
1673 AnyObject *obj = new AnyObject();
1674 PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1675 std::move( pCrc32cDigests) );
1676
1677 obj->Set( pgInfo );
1678 response = obj;
1679 return Status();
1680 }
1681
1682 //------------------------------------------------------------------------
1683 // kXR_pgwrite
1684 //------------------------------------------------------------------------
1685 case kXR_pgwrite:
1686 {
1687 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1688
1689 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1690 if( rsp->status.bdy.dlen > 0 )
1691 {
1692 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1693 size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1694 retries.reserve( pgcnt );
1695 kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1696 sizeof( ServerResponseBody_pgWrCSE ) );
1697
1698 for( size_t i = 0; i < pgcnt; ++i )
1699 {
1700 uint32_t len = XrdSys::PageSize;
1701 if( i == 0 ) len = cse->dlFirst;
1702 else if( i == pgcnt - 1 ) len = cse->dlLast;
1703 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1704 }
1705 }
1706
1707 RetryInfo *info = new RetryInfo( std::move( retries ) );
1708 AnyObject *obj = new AnyObject();
1709 obj->Set( info );
1710 response = obj;
1711
1712 return Status();
1713 }
1714
1715
1716 //------------------------------------------------------------------------
1717 // kXR_readv - we need to pass the length of the buffer to the user code
1718 //------------------------------------------------------------------------
1719 case kXR_readv:
1720 {
1721 log->Dump( XRootDMsg, "[%s] Parsing the response to %p as "
1722 "VectorReadInfo", pUrl.GetHostId().c_str(),
1723 pRequest->GetObfuscatedDescription().c_str() );
1724
1725 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1726 {
1727 //--------------------------------------------------------------------
1728 // we are expecting to have only the header in the message, the raw
1729 // data have been readout into the user buffer
1730 //--------------------------------------------------------------------
1731 if( pPartialResps[i]->GetSize() > 8 )
1732 return Status( stOK, errInternal );
1733 }
1734 //----------------------------------------------------------------------
1735 // we are expecting to have only the header in the message, the raw
1736 // data have been readout into the user buffer
1737 //----------------------------------------------------------------------
1738 if( pResponse->GetSize() > 8 )
1739 return Status( stOK, errInternal );
1740 //----------------------------------------------------------------------
1741 // Get the response for the end user
1742 //----------------------------------------------------------------------
1743 return pBodyReader->GetResponse( response );
1744 }
1745
1746 //------------------------------------------------------------------------
1747 // kXR_fattr
1748 //------------------------------------------------------------------------
1749 case kXR_fattr:
1750 {
1751 int len = rsp->hdr.dlen;
1752 char* data = rsp->body.buffer.data;
1753
1754 return ParseXAttrResponse( data, len, response );
1755 }
1756
1757 //------------------------------------------------------------------------
1758 // kXR_query
1759 //------------------------------------------------------------------------
1760 case kXR_query:
1761 case kXR_set:
1762 case kXR_prepare:
1763 default:
1764 {
1765 AnyObject *obj = new AnyObject();
1766 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1767 pUrl.GetHostId().c_str(),
1768 pRequest->GetObfuscatedDescription().c_str() );
1769
1770 BinaryDataInfo *data = new BinaryDataInfo();
1771 data->Allocate( length );
1772 data->Append( buffer, length );
1773 obj->Set( data );
1774 response = obj;
1775 return Status();
1776 }
1777 };
1778 return Status( stError, errInvalidMessage );
1779 }
1780
1781 //------------------------------------------------------------------------
1782 // Parse the response to kXR_fattr request and put it in an object that
1783 // could be passed to the user
1784 //------------------------------------------------------------------------
1785 Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1786 AnyObject *&response )
1787 {
1788 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1789// Log *log = DefaultEnv::GetLog(); //TODO
1790
1791 switch( req->fattr.subcode )
1792 {
1793 case kXR_fattrDel:
1794 case kXR_fattrSet:
1795 {
1796 Status status;
1797
1798 kXR_char nerrs = 0;
1799 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1800 return status;
1801
1802 kXR_char nattr = 0;
1803 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1804 return status;
1805
1806 std::vector<XAttrStatus> resp;
1807 // read the namevec
1808 for( kXR_char i = 0; i < nattr; ++i )
1809 {
1810 kXR_unt16 rc = 0;
1811 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1812 return status;
1813 rc = ntohs( rc );
1814
1815 // count errors
1816 if( rc ) --nerrs;
1817
1818 std::string name;
1819 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1820 return status;
1821
1822 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1823 XRootDStatus();
1824 resp.push_back( XAttrStatus( name, st ) );
1825 }
1826
1827 // check if we read all the data and if the error count is OK
1828 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1829
1830 // set up the response object
1831 response = new AnyObject();
1832 response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1833
1834 return Status();
1835 }
1836
1837 case kXR_fattrGet:
1838 {
1839 Status status;
1840
1841 kXR_char nerrs = 0;
1842 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1843 return status;
1844
1845 kXR_char nattr = 0;
1846 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1847 return status;
1848
1849 std::vector<XAttr> resp;
1850 resp.reserve( nattr );
1851
1852 // read the name vec
1853 for( kXR_char i = 0; i < nattr; ++i )
1854 {
1855 kXR_unt16 rc = 0;
1856 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1857 return status;
1858 rc = ntohs( rc );
1859
1860 // count errors
1861 if( rc ) --nerrs;
1862
1863 std::string name;
1864 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1865 return status;
1866
1867 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1868 XRootDStatus();
1869 resp.push_back( XAttr( name, st ) );
1870 }
1871
1872 // read the value vec
1873 for( kXR_char i = 0; i < nattr; ++i )
1874 {
1875 kXR_int32 vlen = 0;
1876 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1877 return status;
1878 vlen = ntohl( vlen );
1879
1880 std::string value;
1881 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1882 return status;
1883
1884 resp[i].value.swap( value );
1885 }
1886
1887 // check if we read all the data and if the error count is OK
1888 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1889
1890 // set up the response object
1891 response = new AnyObject();
1892 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1893
1894 return Status();
1895 }
1896
1897 case kXR_fattrList:
1898 {
1899 Status status;
1900 std::vector<XAttr> resp;
1901
1902 while( len > 0 )
1903 {
1904 std::string name;
1905 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1906 return status;
1907
1908 kXR_int32 vlen = 0;
1909 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1910 return status;
1911 vlen = ntohl( vlen );
1912
1913 std::string value;
1914 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1915 return status;
1916
1917 resp.push_back( XAttr( name, value ) );
1918 }
1919
1920 // set up the response object
1921 response = new AnyObject();
1922 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1923
1924 return Status();
1925 }
1926
1927 default:
1928 return Status( stError, errDataError );
1929 }
1930 }
1931
1932 //----------------------------------------------------------------------------
1933 // Perform the changes to the original request needed by the redirect
1934 // procedure - allocate new streamid, append redirection data and such
1935 //----------------------------------------------------------------------------
1936 Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1937 {
1938 Log *log = DefaultEnv::GetLog();
1939
1940 Status st;
1941 // Append any "xrd.*" parameters present in newCgi so that any authentication
1942 // requirements are properly enforced
1943 const URL::ParamsMap &newCgi = newUrl.GetParams();
1944 std::string xrdCgi = "";
1945 std::ostringstream ossXrd;
1946 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1947 {
1948 if( it->first.compare( 0, 4, "xrd." ) )
1949 continue;
1950 ossXrd << it->first << '=' << it->second << '&';
1951 }
1952
1953 xrdCgi = ossXrd.str();
1954 // Redirection URL containing also any original xrd.* opaque parameters
1955 XrdCl::URL authUrl;
1956
1957 if (xrdCgi.empty())
1958 {
1959 authUrl = newUrl;
1960 }
1961 else
1962 {
1963 std::string surl = newUrl.GetURL();
1964 (surl.find('?') == std::string::npos) ? (surl += '?') :
1965 ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1966 surl += xrdCgi;
1967 if (!authUrl.FromString(surl))
1968 {
1969 std::string surlLog = surl;
1970 if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1971 surlLog = obfuscateAuth(surlLog);
1972 }
1973 log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1974 newUrl.GetHostId().c_str(), surl.c_str());
1975 return Status(stError, errInvalidRedirectURL);
1976 }
1977 }
1978
1979 //--------------------------------------------------------------------------
1980 // Rewrite particular requests
1981 //--------------------------------------------------------------------------
1983 MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1985 return Status();
1986 }
1987
1988 //----------------------------------------------------------------------------
1989 // Some requests need to be rewritten also after getting kXR_wait
1990 //----------------------------------------------------------------------------
1991 Status XRootDMsgHandler::RewriteRequestWait()
1992 {
1993 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1994
1996
1997 //------------------------------------------------------------------------
1998 // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1999 // turned off after wait
2000 //------------------------------------------------------------------------
2001 switch( req->header.requestid )
2002 {
2003 case kXR_locate:
2004 {
2005 uint16_t refresh = kXR_refresh;
2006 req->locate.options &= (~refresh);
2007 break;
2008 }
2009
2010 case kXR_open:
2011 {
2012 uint16_t refresh = kXR_refresh;
2013 req->locate.options &= (~refresh);
2014 break;
2015 }
2016 }
2017
2020 return Status();
2021 }
2022
2023 //----------------------------------------------------------------------------
2024 // Recover error
2025 //----------------------------------------------------------------------------
2026 void XRootDMsgHandler::HandleError( XRootDStatus status )
2027 {
2028 //--------------------------------------------------------------------------
2029 // If there was no error then do nothing
2030 //--------------------------------------------------------------------------
2031 if( status.IsOK() )
2032 return;
2033
2034 if( pSidMgr && pMsgInFly && (
2035 status.code == errOperationExpired ||
2036 status.code == errOperationInterrupted ) )
2037 {
2038 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2039 pSidMgr->TimeOutSID( req->header.streamid );
2040 }
2041
2042 bool noreplicas = ( status.code == errErrorResponse &&
2043 status.errNo == kXR_noReplicas );
2044
2045 if( !noreplicas ) pLastError = status;
2046
2047 Log *log = DefaultEnv::GetLog();
2048 log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2049 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2050 status.ToString().c_str() );
2051
2052 //--------------------------------------------------------------------------
2053 // Check if it is a fatal TLS error that has been marked as potentially
2054 // recoverable, if yes check if we can downgrade from fatal to error.
2055 //--------------------------------------------------------------------------
2056 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2057 {
2058 if( pSslErrCnt < MaxSslErrRetry )
2059 {
2060 status.status &= ~stFatal; // switch off fatal&error bits
2061 status.status |= stError; // switch on error bit
2062 }
2063 ++pSslErrCnt; // count number of consecutive SSL errors
2064 }
2065 else
2066 pSslErrCnt = 0;
2067
2068 //--------------------------------------------------------------------------
2069 // We have got an error message, we can recover it at the load balancer if:
2070 // 1) we haven't got it from the load balancer
2071 // 2) we have a load balancer assigned
2072 // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2073 // kXR_NotFound
2074 // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2075 //--------------------------------------------------------------------------
2076 if( status.code == errErrorResponse )
2077 {
2078 if( RetriableErrorResponse( status ) )
2079 {
2080 UpdateTriedCGI(status.errNo);
2081 if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2082 SwitchOnRefreshFlag();
2083 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2084 return;
2085 }
2086 else
2087 {
2088 pStatus = status;
2089 HandleRspOrQueue();
2090 return;
2091 }
2092 }
2093
2094 //--------------------------------------------------------------------------
2095 // Nothing can be done if:
2096 // 1) a user timeout has occurred
2097 // 2) has a non-zero session id
2098 // 3) if another error occurred and the validity of the message expired
2099 //--------------------------------------------------------------------------
2100 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2101 status.code == errOperationInterrupted || time(0) >= pExpiration )
2102 {
2103 log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2104 pUrl.GetHostId().c_str(),
2105 pRequest->GetObfuscatedDescription().c_str() );
2106 pStatus = status;
2107 HandleRspOrQueue();
2108 return;
2109 }
2110
2111 //--------------------------------------------------------------------------
2112 // At this point we're left with connection errors, we recover them
2113 // at a load balancer if we have one and if not on the current server
2114 // until we get a response, an unrecoverable error or a timeout
2115 //--------------------------------------------------------------------------
2116 if( pLoadBalancer.url.IsValid() &&
2117 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2118 {
2119 UpdateTriedCGI( kXR_ServerError );
2120 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2121 return;
2122 }
2123 else
2124 {
2125 if( !status.IsFatal() && IsRetriable() )
2126 {
2127 log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2128 pUrl.GetHostId().c_str(),
2129 pRequest->GetObfuscatedDescription().c_str() );
2130
2131 UpdateTriedCGI( kXR_ServerError );
2132 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2133 return;
2134 }
2135 pStatus = status;
2136 HandleRspOrQueue();
2137 return;
2138 }
2139 }
2140
2141 //----------------------------------------------------------------------------
2142 // Retry the message at another server
2143 //----------------------------------------------------------------------------
2144 Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2145 {
2146 // prepare to possibly be requeued in the out-queue for a different channel.
2147 // reset sendingstate; this is reset by OnReadyToSend() when our message is
2148 // removed from out-queue, however OnStatusReady() may be called before that
2149 // in case something happens before sending has been attempted. (e.g. stream
2150 // broken or request timeout)
2151 pSendingState = 0;
2152
2153 pResponse.reset();
2154 Log *log = DefaultEnv::GetLog();
2155
2156 //--------------------------------------------------------------------------
2157 // Set up a redirect entry
2158 //--------------------------------------------------------------------------
2159 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2160 pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2161
2162 if( pUrl.GetLocation() != url.GetLocation() )
2163 {
2164 pHosts->push_back( url );
2165
2166 //------------------------------------------------------------------------
2167 // Assign a new stream id to the message
2168 //------------------------------------------------------------------------
2169
2170 // first release the old stream id
2171 // (though it could be a redirect from a local
2172 // metalink file, in this case there's no SID)
2173 ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2174 if( pSidMgr )
2175 {
2176 pSidMgr->ReleaseSID( req->streamid );
2177 pSidMgr.reset();
2178 }
2179
2180 // then get the new SIDManager
2181 // (again this could be a redirect to a local
2182 // file and in this case there is no SID)
2183 if( !url.IsLocalFile() )
2184 {
2185 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2186 Status st = pSidMgr->AllocateSID( req->streamid );
2187 if( !st.IsOK() )
2188 {
2189 log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2190 pUrl.GetHostId().c_str(),
2191 pRequest->GetObfuscatedDescription().c_str() );
2192 return st;
2193 }
2194 }
2195
2196 pUrl = url;
2197 }
2198
2199 if( pUrl.IsMetalink() && pFollowMetalink )
2200 {
2201 log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2202 pUrl.GetHostId().c_str(), this,
2203 pRequest->GetObfuscatedDescription().c_str() );
2204
2205 return pPostMaster->Redirect( pUrl, pRequest, this );
2206 }
2207 else if( pUrl.IsLocalFile() )
2208 {
2209 HandleLocalRedirect( &pUrl );
2210 return Status();
2211 }
2212 else
2213 {
2214 log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2215 pUrl.GetHostId().c_str(), this,
2216 pRequest->GetObfuscatedDescription().c_str() );
2217 return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2218 }
2219 }
2220
2221 //----------------------------------------------------------------------------
2222 // Update the "tried=" part of the CGI of the current message
2223 //----------------------------------------------------------------------------
2224 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2225 {
2226 URL::ParamsMap cgi;
2227 std::string tried;
2228
2229 //--------------------------------------------------------------------------
2230 // In case a data server responded with a kXR_redirect and we fail at the
2231 // node where we were redirected to, the original data server should be
2232 // included in the tried CGI opaque info (instead of the current one).
2233 //--------------------------------------------------------------------------
2234 if( pEffectiveDataServerUrl )
2235 {
2236 tried = pEffectiveDataServerUrl->GetHostName();
2237 delete pEffectiveDataServerUrl;
2238 pEffectiveDataServerUrl = 0;
2239 }
2240 //--------------------------------------------------------------------------
2241 // Otherwise use the current URL.
2242 //--------------------------------------------------------------------------
2243 else
2244 tried = pUrl.GetHostName();
2245
2246 // Report the reason for the failure to the next location
2247 //
2248 if (errNo)
2249 { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2250 else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2251 else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2252 else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2253 }
2254
2255 //--------------------------------------------------------------------------
2256 // If our current load balancer is a metamanager and we failed either
2257 // at a diskserver or at an unidentified node we also exclude the last
2258 // known manager
2259 //--------------------------------------------------------------------------
2260 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2261 {
2262 HostList::reverse_iterator it;
2263 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2264 {
2265 if( it->loadBalancer )
2266 break;
2267
2268 tried += "," + it->url.GetHostName();
2269
2270 if( it->flags & kXR_isManager )
2271 break;
2272 }
2273 }
2274
2275 cgi["tried"] = tried;
2277 MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2279 }
2280
2281 //----------------------------------------------------------------------------
2282 // Switch on the refresh flag for some requests
2283 //----------------------------------------------------------------------------
2284 void XRootDMsgHandler::SwitchOnRefreshFlag()
2285 {
2287 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2288 switch( req->header.requestid )
2289 {
2290 case kXR_locate:
2291 {
2292 req->locate.options |= kXR_refresh;
2293 break;
2294 }
2295
2296 case kXR_open:
2297 {
2298 req->locate.options |= kXR_refresh;
2299 break;
2300 }
2301 }
2304 }
2305
2306 //------------------------------------------------------------------------
2307 // If the current thread is a worker thread from our thread-pool
2308 // handle the response, otherwise submit a new task to the thread-pool
2309 //------------------------------------------------------------------------
2310 void XRootDMsgHandler::HandleRspOrQueue()
2311 {
2312 //--------------------------------------------------------------------------
2313 // Is it a final response?
2314 //--------------------------------------------------------------------------
2315 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2316 if( finalrsp )
2317 {
2318 // Do not do final processing of the response if we haven't had
2319 // confirmation the original request was sent (via OnStatusReady).
2320 // The final processing will be triggered when we get the confirm.
2321 const int sst = pSendingState.fetch_or( kFinalResp );
2322 if( !( sst & kSendDone ) )
2323 return;
2324 }
2325
2326 JobManager *jobMgr = pPostMaster->GetJobManager();
2327 if( jobMgr->IsWorker() )
2328 HandleResponse();
2329 else
2330 {
2331 Log *log = DefaultEnv::GetLog();
2332 log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2333 pUrl.GetHostId().c_str(), this,
2334 pRequest->GetObfuscatedDescription().c_str() );
2335 jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2336 }
2337 }
2338
2339 //------------------------------------------------------------------------
2340 // Notify the FileStateHandler to retry Open() with new URL
2341 //------------------------------------------------------------------------
2342 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2343 {
2344 Log *log = DefaultEnv::GetLog();
2345 log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2346 pUrl.GetHostId().c_str(), this,
2347 pRequest->GetObfuscatedDescription().c_str() );
2348
2349 if( !pLFileHandler )
2350 {
2351 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2352 return;
2353 }
2354
2355 AnyObject *resp = 0;
2356 pLFileHandler->SetHostList( *pHosts );
2357 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2358 if( !st.IsOK() )
2359 {
2360 HandleError( st );
2361 return;
2362 }
2363
2364 pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2365 resp,
2366 pHosts.release() );
2367 delete this;
2368
2369 return;
2370 }
2371
2372 //------------------------------------------------------------------------
2373 // Check if it is OK to retry this request
2374 //------------------------------------------------------------------------
2375 bool XRootDMsgHandler::IsRetriable()
2376 {
2377 std::string value;
2378 DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2379 if( value == "true" ) return true;
2380
2381 // check if it is a mutable open (open + truncate or open + create)
2382 ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2383 if( req->header.requestid == htons( kXR_open ) )
2384 {
2385 bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2386 ( req->open.options & htons( kXR_new ) );
2387
2388 if( _mutable )
2389 {
2390 Log *log = DefaultEnv::GetLog();
2391 log->Debug( XRootDMsg,
2392 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2393 pUrl.GetHostId().c_str(),
2394 pRequest->GetObfuscatedDescription().c_str() );
2395 // disallow retry if it is a mutable open
2396 return false;
2397 }
2398 }
2399
2400 return true;
2401 }
2402
2403 //------------------------------------------------------------------------
2404 // Check if for given request and Metalink redirector it is OK to omit
2405 // the kXR_wait and proceed straight to the next entry in the Metalink file
2406 //------------------------------------------------------------------------
2407 bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2408 {
2409 // we can omit kXR_wait only if we have a Metalink redirector
2410 if( !url.IsMetalink() )
2411 return false;
2412
2413 // we can omit kXR_wait only for requests that can be redirected
2414 // (kXR_read is the only stateful request that can be redirected)
2415 ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2416 if( pStateful && req->header.requestid != kXR_read )
2417 return false;
2418
2419 // we can only omit kXR_wait if the Metalink redirect has more
2420 // replicas
2421 RedirectorRegistry &registry = RedirectorRegistry::Instance();
2422 VirtualRedirector *redirector = registry.Get( url );
2423
2424 // we need more than one server as the current one is not reflected
2425 // in tried CGI
2426 if( redirector->Count( request ) > 1 )
2427 return true;
2428
2429 return false;
2430 }
2431
2432 //------------------------------------------------------------------------
2433 // Checks if the given error returned by server is retriable.
2434 //------------------------------------------------------------------------
2435 bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2436 {
2437 // we can only retry error response if we have a valid load-balancer and
2438 // it is not our current URL
2439 if( !( pLoadBalancer.url.IsValid() &&
2440 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2441 return false;
2442
2443 // following errors are retriable at any load-balancer
2444 if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2445 status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2446 status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2447 return true;
2448
2449 // check if the load-balancer is a meta-manager, if yes there are
2450 // more errors that can be recovered
2451 if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2452
2453 // those errors are retriable for meta-managers
2454 if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2455 return true;
2456
2457 // in case of not-authorized error there is an imposed upper limit
2458 // on how many times we can retry this error
2459 if( status.errNo == kXR_NotAuthorized )
2460 {
2462 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2463 bool ret = pNotAuthorizedCounter < limit;
2464 ++pNotAuthorizedCounter;
2465 if( !ret )
2466 {
2467 Log *log = DefaultEnv::GetLog();
2468 log->Error( XRootDMsg,
2469 "[%s] Reached limit of NotAuthorized retries!",
2470 pUrl.GetHostId().c_str() );
2471 }
2472 return ret;
2473 }
2474
2475 // check if the load-balancer is a virtual (metalink) redirector,
2476 // if yes there are even more errors that can be recovered
2477 if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2478
2479 // those errors are retriable for virtual (metalink) redirectors
2480 if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2481 return true;
2482
2483 // otherwise it is a non-retriable error
2484 return false;
2485 }
2486
2487 //------------------------------------------------------------------------
2488 // Dump the redirect-trace-back into the log file
2489 //------------------------------------------------------------------------
2490 void XRootDMsgHandler::DumpRedirectTraceBack()
2491 {
2492 if( pRedirectTraceBack.empty() ) return;
2493
2494 std::stringstream sstrm;
2495
2496 sstrm << "Redirect trace-back:\n";
2497
2498 int counter = 0;
2499
2500 auto itr = pRedirectTraceBack.begin();
2501 sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2502
2503 auto prev = itr;
2504 ++itr;
2505 ++counter;
2506
2507 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2508 sstrm << '\t' << counter << ". "
2509 << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2510
2511 int authlimit = DefaultNotAuthorizedRetryLimit;
2512 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2513
2514 bool warn = !pStatus.IsOK() &&
2515 ( pStatus.code == errNotFound ||
2516 pStatus.code == errRedirectLimit ||
2517 ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2518
2519 Log *log = DefaultEnv::GetLog();
2520 if( warn )
2521 log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2522 else
2523 log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2524 }
2525
2526 // Read data from buffer
2527 //------------------------------------------------------------------------
2528 template<typename T>
2529 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2530 {
2531 if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2532
2533 memcpy(&result, buffer, sizeof(T));
2534
2535 buffer += sizeof( T );
2536 buflen -= sizeof( T );
2537
2538 return Status();
2539 }
2540
2541 //------------------------------------------------------------------------
2542 // Read a string from buffer
2543 //------------------------------------------------------------------------
2544 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2545 {
2546 Status status;
2547 char c = 0;
2548
2549 while( true )
2550 {
2551 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2552 return status;
2553
2554 if( c == 0 ) break;
2555 result += c;
2556 }
2557
2558 return status;
2559 }
2560
2561 //------------------------------------------------------------------------
2562 // Read a string from buffer
2563 //------------------------------------------------------------------------
2564 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2565 size_t size, std::string &result )
2566 {
2567 Status status;
2568
2569 if( size > buflen ) return Status( stError, errDataError );
2570
2571 result.append( buffer, size );
2572 buffer += size;
2573 buflen -= size;
2574
2575 return status;
2576 }
2577
2578}
@ kXR_NotAuthorized
@ kXR_NotFound
@ kXR_FileLocked
Definition XProtocol.hh:993
@ kXR_noReplicas
@ kXR_Unsupported
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ArgTooLong
Definition XProtocol.hh:992
@ kXR_noserver
@ kXR_IOError
Definition XProtocol.hh:997
@ kXR_FSError
Definition XProtocol.hh:995
@ kXR_NoMemory
Definition XProtocol.hh:998
#define kXR_isManager
union ServerResponse::@0 body
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:854
#define kXR_collapseRedir
ServerResponseStatus status
#define kXR_attrMeta
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 options
Definition XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition XProtocol.hh:852
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:860
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:846
#define kXR_recoverWrts
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
#define kXR_isServer
#define kXR_attrVirtRdr
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ServerResponseHeader hdr
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
@ kXR_vfs
Definition XProtocol.hh:763
struct ClientStatRequest stat
Definition XProtocol.hh:873
#define kXR_ecRedir
struct ClientLocateRequest locate
Definition XProtocol.hh:856
ServerResponseHeader hdr
long long kXR_int64
Definition XPtypes.hh:98
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
Handle diagnostics.
Definition XrdClLog.hh:101
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition XrdClURL.hh:161
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:344
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
bool IsLocalFile() const
Definition XrdClURL.cc:474
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:126
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
const uint16_t errNotSupported
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
none object for initializing empty Optional
XrdSysError Log
Definition XrdConfig.cc:113
@ kXR_PartialResult
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version