82{
84
85
86
88 xp->
Init(rsltP, linkP, seceP, nameP, protP);
89 return xp;
90}
91
92
93
94
95
97 const struct iovec *ioV, int ioN, int ioL)
98{
100
101
102
104 {
TRACE(REQ,
"Unable to find request for " <<lP->
ID <<
" sid=" <<*theSID);
105 return 0;
106 }
107
108
109
110 return tP->
bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
111}
112
113
114
115
116
118 const struct iovec *ioV, int ioN, int ioL)
119{
120 int rc;
121
122
123
125 delete tP;
126 runWait = 0;
127
128
129
131 && (!ioN ||
XRD_GETNUM(ioV[0].iov_base) == 0))
133 return 0;
134 }
135
136
137
138 rc =
Send(rcode, ioV, ioN, ioL);
139
140
141
142 if (rc >= 0 && !runWait)
143 {if (runDone) runStatus.store(0, std::memory_order_release);
146 }
147
148
149
150 return rc;
151}
152
153
154
155
156
158{
159 char buff[128];
160
161
162
163 if (runStatus.fetch_add(1, std::memory_order_acq_rel)) return false;
164
165
166
168
169
170
171 sprintf(buff, "%s disconnection", pName);
173
174
175
176 TranStack.
Push(&TranLink);
177 return true;
178}
179
180
181
182
183
184bool XrdXrootdTransit::Fail(int ecode, const char *etext)
185{
186 runError = ecode;
187 runEText = etext;
188 return true;
189}
190
191
192
193
194
195int XrdXrootdTransit::Fatal(int rc)
196{
199
200 return (respObj->
Error(rInfo, runError, runEText) ? rc : -1);
201}
202
203
204
205
206
208{
210 TranStack.
Set(qMax, qTTL);
211}
212
213
214
218 const char *nameP,
219 const char *protP
220 )
221{
223 const char *who;
225
226
227
228 runArgs = 0;
229 runALen = 0;
230 runABsz = 0;
231 runError = 0;
232 runStatus.store(0, std::memory_order_release);
233 runWait = 0;
234 runWTot = 0;
235 runWMax = 3600;
236 runWCall = false;
237 runDone = false;
238 reInvoke = false;
239 wBuff = 0;
240 wBLen = 0;
241 respObj = respP;
242 pName = protP;
244
245
246
253
254
255
256 strncpy(uname, nameP, sizeof(uname)-1);
257 uname[sizeof(uname)-1] = 0;
260
261
262
263
264
266
267
268
269
273
274
275
278
279
280
285
286
287
293 }
294 }
295
296
297
299
300
301
305
306
307
308 who = (seceP && seceP->
name ? seceP->
name :
"nobody");
310
311
312
314 cTime = time(0);
315
316
317
319}
320
321
322
323
324
326{
327 int rc;
328
329
330
332 else rc = 0;
333
334
335
337 else if (rc != -EINPROGRESS)
Link->
Close();
338}
339
340
341
342
343
345{
346 int rc;
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361do{rc = realProt->
Process((reInvoke ? 0 : lp));
362 if (rc >= 0 && runStatus.load(std::memory_order_acquire))
363 {reInvoke = (rc == 0);
364 if (runError) rc = Fatal(rc);
365 else {runDone = false;
367 if (rc >= 0)
368 {if (runWait) rc = -EINPROGRESS;
369 if (!runDone) return rc;
370 runStatus.store(0, std::memory_order_release);
371 }
372 }
373 } else reInvoke = false;
374 } while(rc >= 0 && reInvoke);
375
376
377
378 runStatus.store(0, std::memory_order_release);
379
380
381
382 return (rc ? rc : 1);
383}
384
385
386
387
388
390{
391
392
393
394 runStatus.fetch_add(1, std::memory_order_acq_rel);
395
396
397
398
399
400 if (runWait > 0) {
401 TRACEP(
EMSG,
"WARNING: Recycle is canceling wait job; the wait job might already be running during recycle.");
403 }
404
405
406
407 if (realProt) realProt->
Recycle(lp, consec, reason);
408
409
410
412
413
414
415 if (runArgs) {free(runArgs); runArgs = 0;}
416
417
418
420
421
422
423 TranStack.
Push(&TranLink);
424}
425
426
427
428
429
431{
433 static char eText[] = "Insufficent memory to re-issue request";
434 static struct iovec ioV[] = {{(char *)&eCode,sizeof(eCode)},
435 {(char *)&eText,sizeof(eText)}};
436 int rc;
437
438
439
440 TRACEP(REQ,
"Bridge redrive runStatus="<<runStatus.load(std::memory_order_acquire)
441 <<" runError="<<runError
442 <<" runWait="<<runWait<<" runWTot="<<runWTot);
443
444
445
446 runWTot += runWait;
447 runWait = 0;
448
449
450
451
452
453
454
455
456
457
458 if (!runALen || RunCopy(runArgs, runALen)) {
460 TRACEP(REQ,
"Bridge redrive Process2 rc="<<rc
461 <<" runError="<<runError<<" runWait="<<runWait);
462 if (rc == 0 && !runWait && !runError) {
464 TRACEP(REQ,
"Bridge redrive callback rc="<<rc
465 <<" runStatus="<<runStatus.load(std::memory_order_acquire));
466 }
467 } while((rc == 0) && !runError && !runWait);
468 }
470
471
472
473 if (rc >= 0 && runWait) return;
474 runWTot = 0;
475
476
477
478 runStatus.store(0, std::memory_order_release);
479
480
481
482
485}
486
487
488
489
490
491#define KXR_INDEX(x) x-kXR_auth
492
494{
496
497
498
499 memset(rTab, 0, sizeof(rTab));
520
521
522
523 return rTab;
524}
525
526
527
528
529
530bool XrdXrootdTransit::ReqWrite(char *xdataP, int xdataL)
531{
532
533
534
536
537
538
540 {
Resume = 0; wBuff = xdataP; wBLen = xdataL;
541 return true;
542 }
543
544
545
546
550 return true;
551}
552
553
554
555
556
558{
559 int movLen;
560
561
562
563
564
565 if (runStatus.fetch_add(1, std::memory_order_acq_rel))
566 {
TRACEP(REQ,
"Bridge request failed due to re-entry");
567 return false;
568 }
569
570
571
573
574
575
580 {
TRACEP(REQ,
"Unsupported bridge request");
582 }
583
584
585
588 {
TRACEP(REQ,
"Invalid request data length");
590 }
591
592
593
597
598
599
601
602
603
604
605
608 if (!RunCopy(xdataP, movLen)) return true;
609 if (!runArgs || movLen > runABsz)
610 {if (runArgs) free(runArgs);
611 if (!(runArgs = (char *)malloc(movLen)))
612 {
TRACEP(REQ,
"Failed to allocate memory");
614 }
615 runABsz = movLen;
616 }
617 memcpy(runArgs, xdataP, movLen); runALen = movLen;
621 return true;
622 }
623 } else runALen = 0;
624
625
626
627 runError = 0;
629 return true;
630}
631
632
633
634
635
636bool XrdXrootdTransit::RunCopy(char *buffP, int buffL)
637{
638
639
640
644 {Fail(
kXR_ArgTooLong,
"Request argument too long");
return false;}
646 }
647
648
649
652 return true;
653}
654
655
656
657
658
660{
664 int rc;
665 bool aOK;
666
667
668
669 runDone = true;
670 switch(rcode)
673 eMsg = (ioN < 2 ?
"" : (
const char *)ioV[1].iov_base);
674 if (wBuff) respObj->
Free(rInfo, wBuff, wBLen);
676 break;
678 if (wBuff) respObj->
Free(rInfo, wBuff, wBLen);
679 aOK = (ioN ? respObj->
Data(rInfo, ioV, ioN, ioL,
true)
680 : respObj->
Done(rInfo));
681 break;
683 aOK = respObj->
Data(rInfo, ioV, ioN, ioL,
false);
684 runDone = false;
685 break;
687 if (wBuff) respObj->
Free(rInfo, wBuff, wBLen);
689 aOK = respObj->
Redir(rInfo,rc,(
const char *)ioV[1].iov_base);
690 break;
692 return Wait(rInfo, ioV, ioN, ioL);
693 break;
695 runDone = false;
696 return WaitResp(rInfo, ioV, ioN, ioL);
697 break;
698 default:
if (wBuff) respObj->
Free(rInfo, wBuff, wBLen);
700 "internal logic error");
701 break;
702 };
703
704
705
706 return (aOK ? 0 : -1);
707}
708
709
710
712{
715 offset, dlen, fdnum);
716
717
718
719 runDone = true;
720 return (respObj->
File(sfInfo, dlen) ? 0 : -1);
721}
722
723
724
726{
729 sfvec, sfvnum, dlen);
730
731
732
733 runDone = true;
734 return (respObj->
File(sfInfo, dlen) ? 0 : -1);
735}
736
737
738
739
740
742 const struct iovec *ioV, int ioN, int ioL)
743{
745
746
747
749 eMsg = (ioN < 2 ?
"reason unknown" : (
const char *)ioV[1].iov_base);
750
751
752
753 if (runWMax <= 0)
754 {int wtime = runWait;
755 runWait = 0;
756 return (respObj->
Wait(rInfo, wtime,
eMsg) ? 0 : -1);
757 }
758
759
760
761 if (runWTot >= runWMax)
762 {runDone = true;
763 runWait = 0;
765 }
766
767
768
769 if (runWait > runWMax) runWait = runWMax;
770
771
772
773 if (runWCall && !(respObj->
Wait(rInfo, runWait,
eMsg)))
return -1;
774
775
776
777 TRACEP(REQ,
"Bridge delaying request " <<runWait <<
" sec (" <<
eMsg <<
")");
779 return 0;
780}
781
782
783
784
785
787 const struct iovec *ioV, int ioN, int ioL)
788{
791 int wTime;
792
793
794
796 eMsg = (ioN < 2 ?
"reason unknown" : (
const char *)ioV[1].iov_base);
797 TRACEP(REQ,
"Bridge waiting for resp; sid=" <<rInfo.
sID.num
798 <<
" wt=" <<wTime <<
" (" <<
eMsg <<
")");
799
800
801
802
803
804
805
806
809
810
811
812 runWait = -1;
813 return 0;
814}
struct ClientRequestHdr header
struct ClientLoginRequest login
XrdOucTrace * XrdXrootdTrace
void Release(XrdBuffer *bp)
XrdBuffer * Obtain(int bsz)
XrdProtocol * setProtocol(XrdProtocol *pp, bool runit=false, bool push=false)
int Close(bool defer=false)
void setID(const char *userid, int procid)
void Enable()
Enable the link to field interrupts.
XrdNetAddrInfo * AddrInfo()
char * ID
Pointer to the client's link identity.
void armBridge()
Mark this link as an in-memory communications bridge (internal use only).
void setProtName(const char *name)
const char * Host() const
unsigned int Inst() const
bool isIPType(IPType ipType) const
void Set(int inQMax, time_t agemax=1800)
void Push(XrdObject< T > *Node)
static void Sanitize(char *instr, char subc='_')
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Process(XrdLink *lp)=0
void Schedule(XrdJob *jp)
const char * pident
Trace identifier (originator)
XrdNetAddrInfo * addrInfo
Entity's connection details.
const char * tident
Trace identifier always preset.
char prot[XrdSecPROTOIDSIZE]
Auth protocol used (e.g. krb5)
char * name
Entity's name.
unsigned int ueid
Unique ID of entity instance.
char * host
Entity's host name dnr dependent.
virtual void Connect(const XrdSecEntity *client=0)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
void Register(const char *Uname, const char *Hname, const char *Pname, unsigned int xSID=0)
void Report(const char *Info)
static XrdXrootdStats * SI
static XrdSysError & eDest
static unsigned int getSID()
XrdXrootdMonitor::User Monitor
int(XrdXrootdProtocol::* Resume)()
static XrdScheduler * Sched
int Process(XrdLink *lp) override
void Recycle(XrdLink *lp, int consec, const char *reason) override
XrdXrootdResponse Response
static XrdBuffManager * BPool
static XrdSfsFileSystem * osFS
void setID(unsigned long long id)
static XrdXrootdTransPend * Remove(XrdLink *lP, short sid)
union XrdXrootdTransPend::@191 Pend
XrdXrootdTransit * bridge
static void Clear(XrdXrootdTransit *trP)
bool Run(const char *xreqP, char *xdataP=0, int xdataL=0)
Inject an xrootd request into the protocol stack.
static const char * ReqTable()
Initialize the valid request table.
void Redrive()
Redrive a request after a wait.
int Send(int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle request data response.
void Recycle(XrdLink *lp, int consec, const char *reason)
Handle link shutdown.
static void Init(XrdScheduler *schedP, int qMax, int qTTL)
Perform one-time initialization.
static int Attn(XrdLink *lP, short *theSID, int rcode, const struct iovec *ioVec, int ioNum, int ioLen)
Handle attention response (i.e. async response)
void Proceed()
Resume processing after a waitresp completion.
bool Disc()
Handle dismantlement.
int Process(XrdLink *lp)
Handle link activation (replaces parent activation).
union XrdXrootd::Bridge::Context::@164 sID
associated request stream ID
virtual int File(Bridge::Context &info, int dlen)=0
virtual bool Data(Bridge::Context &info, const struct iovec *iovP, int iovN, int iovL, bool final)=0
virtual bool Error(Bridge::Context &info, int ecode, const char *etext)=0
virtual bool Done(Bridge::Context &info)=0
the result context
virtual bool Redir(Bridge::Context &info, int port, const char *hname)=0
virtual bool Wait(Bridge::Context &info, int wtime, const char *wtext)
virtual void Free(Bridge::Context &info, char *buffP, int buffL)
static const int uIPv4
ucap: Supports read redirects