XRootD
Loading...
Searching...
No Matches
XrdCmsClientMan.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d C m s C l i e n t M a n . c c */
4/* */
5/* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <ctime>
32
35#include "XrdCms/XrdCmsLogin.hh"
36#include "XrdCms/XrdCmsTrace.hh"
37
39
40#include "XrdSys/XrdSysError.hh"
41#include "XrdSys/XrdSysTimer.hh"
42
43#include "Xrd/XrdInet.hh"
44#include "Xrd/XrdLink.hh"
45
46using namespace XrdCms;
47
48/******************************************************************************/
49/* G l o b a l s */
50/******************************************************************************/
51
52XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16);
53
54XrdInet *XrdCmsClientMan::Network = 0;
55
57
58const char *XrdCmsClientMan::ConfigFN = 0;
59
60XrdSysMutex XrdCmsClientMan::manMutex;
61
62/******************************************************************************/
63/* C o n s t r u c t o r */
64/******************************************************************************/
65
67 int cw, int nr, int rw, int rd)
68 : syncResp(0)
69{
70 static XrdSysMutex initMutex;
71 static int Instance = 0;
72 char *dot;
73
74 Host = strdup(host);
75 if ((dot = index(Host, '.')))
76 {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
77 else HPfx = strdup(Host);
78 Port = port;
79 Link = 0;
80 Active = 0;
81 Silent = 0;
82 Suspend = 1;
83 RecvCnt = 0;
84 nrMax = nr;
85 NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
86 repWMax = rw;
87 repWait = 0;
88 minDelay= rd;
89 maxDelay= rd*3;
90 chkCount= chkVal;
91 lastUpdt= lastTOut = time(0);
92 Next = 0;
93 manInst = 1;
94
95// Compute dally value
96//
97 dally = cw / 2 - 1;
98 if (dally < 3) dally = 3;
99 else if (dally > 10) dally = 10;
100
101// Provide a unique mask number for this manager
102//
103 initMutex.Lock();
104 manMask = 1<<Instance++;
105 initMutex.UnLock();
106}
107
108/******************************************************************************/
109/* D e s t r u c t o r */
110/******************************************************************************/
111
113{
114 if (Link) Link->Close();
115 if (Host) free(Host);
116 if (HPfx) free(HPfx);
117 if (NetBuff) NetBuff->Recycle();
118}
119
120/******************************************************************************/
121/* d e l a y R e s p */
122/******************************************************************************/
123
125{
126 XrdCmsResp *rp;
127 int msgid;
128
129// Obtain the message ID
130//
131 if (!(msgid = Resp.getErrInfo()))
132 {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
133 Resp.setErrInfo(EILSEQ, "redirector protocol error");
134 syncResp.Post();
135 return SFS_ERROR;
136 }
137
138// Allocate a delayed response object
139//
140 if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
141 {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
142 Resp.setErrInfo(0, "0");
143 syncResp.Post();
144 return SFS_STALL;
145 }
146
147// Add this object to our delayed response queue. If the manager bounced then
148// purge all of the pending repsonses to avoid sending wrong ones.
149//
150 if (msgid < maxMsgID) RespQ.Purge();
151 maxMsgID = msgid;
152 RespQ.Add(rp);
153
154// Tell client to wait for response. The semaphore post allows the manager
155// to get the next message from the cmsd. This prevents us from getting the
156// delayed response before the response object is added to the queue.
157//
158 Resp.setErrInfo(0, "");
159 syncResp.Post();
160 return SFS_STARTED;
161}
162
163/******************************************************************************/
164/* S e n d */
165/******************************************************************************/
166
167int XrdCmsClientMan::Send(unsigned int &iMan, char *msg, int mlen)
168{
169 int allok = 0;
170
171// Determine message length
172//
173 if (!mlen) mlen = strlen(msg);
174
175// Send the request
176//
177 myData.Lock();
178 iMan = manInst;
179 if (Active)
180 {if (Link)
181 {if (!(allok = Link->Send(msg, mlen) > 0))
182 {Active = 0;
183 Link->Close(1);
184 manInst++;
185 } else SendCnt++;
186 }
187 }
188 myData.UnLock();
189
190// All done
191//
192 return allok;
193}
194
195/******************************************************************************/
196
197int XrdCmsClientMan::Send(unsigned int &iMan, const struct iovec *iov, int iovcnt, int iotot)
198{
199 int allok = 0;
200
201// Send the request
202//
203 myData.Lock();
204 iMan = manInst;
205 if (Active)
206 {if (Link)
207 {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
208 {Active = 0;
209 Link->Close(1);
210 manInst++;
211 } else SendCnt++;
212 }
213 }
214 myData.UnLock();
215
216// All done
217//
218 return allok;
219}
220
221/******************************************************************************/
222/* S t a r t */
223/******************************************************************************/
224
226{
227
228// First step is to connect to the manager
229//
230 do {Hookup();
231 // Now simply start receiving messages on the stream. When we get a
232 // respwait reply then we must be assured that the object representing
233 // the request is added to the queue before the actual reply arrives.
234 // We do this by waiting on syncResp which is posted once the request
235 // object is fully processed. The actual response associated with the
236 // respwait is synchronized during the callback phase since the client
237 // must receive the respwait before the subsequent response.
238 //
239 while(Receive())
240 if (Response.modifier & CmsResponse::kYR_async) relayResp();
241 else if (Response.rrCode == kYR_status) setStatus();
242 else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
243 {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
244
245 // Tear down the connection
246 //
247 myData.Lock();
248 if (Link) {Link->Close(); Link = 0;}
249 Active = 0; Suspend = 1;
250 myData.UnLock();
251
252 // Indicate the problem
253 //
254 Say.Emsg("ClientMan", "Disconnected from", Host);
255 XrdSysTimer::Snooze(dally);
256 } while(1);
257
258// We should never get here
259//
260 return (void *)0;
261}
262
263/******************************************************************************/
264/* w h a t s U p */
265/******************************************************************************/
266
267int XrdCmsClientMan::whatsUp(const char *user, const char *path,
268 unsigned int iMan)
269{
270 EPNAME("whatsUp");
271 unsigned int xMan;
272 int theDelay, inQ;
273 bool lClose = false;
274
275// The cmsd did not respond. Increase silent count and see if restart is needed
276// Otherwise, increase the wait interval just in case things are just slow.
277//
278 myData.Lock();
279 if (Active)
280 {if (Active == RecvCnt)
281 {if ((time(0)-lastTOut) >= repWait)
282 {Silent++;
283 if (Silent > nrMax)
284 {Active = 0; Silent = 0; Suspend = 1;
285 if (Link && iMan == manInst)
286 {Link->Close(1);
287 manInst++; lClose = true;
288 }
289 } else if (Silent & 0x02 && repWait < repWMax) repWait++;
290 }
291 } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
292 }
293
294// Calclulate how long to delay the client. This will be based on the number
295// of outstanding requests bounded by the config delay value.
296//
297 inQ = XrdCmsClientMsg::inQ();
298 theDelay = inQ * qTime;
299 xMan = manInst;
300 myData.UnLock();
301 theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302 if (theDelay < minDelay) theDelay = minDelay;
303 if (theDelay > maxDelay) theDelay = maxDelay;
304
305// Do Some tracing here
306//
307 TRACE(Redirect, user <<" no resp from inst " <<iMan <<" of "<<HPfx
308 <<" in " <<repWait
309 <<" inst " <<xMan <<(lClose ? " closed" : " steady")
310 <<"; inQ " <<inQ <<" delay " <<theDelay <<" path=" <<path);
311 return theDelay;
312}
313
314/******************************************************************************/
315/* P r i v a t e M e t h o d s */
316/******************************************************************************/
317/******************************************************************************/
318/* H o o k u p */
319/******************************************************************************/
320
321int XrdCmsClientMan::Hookup()
322{
323 EPNAME("Hookup");
324 CmsLoginData Data;
325 XrdLink *lp;
326 char buff[256], hnBuff[264];
327 kXR_char *envData = 0;
328 int rc, oldWait, tries = 12, opts = 0;
329
330// Turn off our debugging and version flags
331//
332 manMutex.Lock();
333 doDebug &= ~manMask;
334 manMutex.UnLock();
335
336// Report our hostname (there are better ways of doing this)
337//
338 const char *hn = getenv("XRDHOST");
339 if (hn)
340 {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s", hn);
341 envData = (kXR_char *)hnBuff;
342 }
343
344// Keep trying to connect to the manager. Note that we bind the link to this
345// thread to make sure we get notified should another thread close the socket.
346//
347 do {while(!(lp = Network->Connect(Host, Port, opts)))
348 {XrdSysTimer::Snooze(dally);
349 if (tries--) opts = XRDNET_NOEMSG;
350 else {opts = 0; tries = 12;}
351 continue;
352 }
353// lp->Bind(XrdSysThread::ID());
354 memset(&Data, 0, sizeof(Data));
355 Data.envCGI = envData;
357 Data.HoldTime = static_cast<int>(getpid());
358 if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
359 lp->Close();
360 XrdSysTimer::Snooze(dally);
361 } while(1);
362
363// Establish global state
364//
365 manMutex.Lock();
366 doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
367 manMutex.UnLock();
368
369// All went well, finally
370//
371 myData.Lock();
372 Link = lp;
373 Active = 1;
374 Silent = 0;
375 RecvCnt = 1;
376 SendCnt = 1;
377 Suspend = (Data.Mode & CmsLoginData::kYR_suspend);
378
379// Calculate how long we will wait for replies before delaying the client.
380// This is computed dynamically based on the expected response window.
381//
382 if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
383 if (Data.HoldTime > repWMax*1000) repWait = repWMax;
384 else if (Data.HoldTime <= 0) repWait = repWMax;
385 else {repWait = Data.HoldTime*3;
386 repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
387 if (repWait > repWMax) repWait = repWMax;
388 else if (repWait < oldWait) repWait = oldWait;
389 }
390 qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
391 lastTOut = time(0);
392 myData.UnLock();
393
394// Tell the world
395//
396 sprintf(buff, "v %d", Data.Version);
397 Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
398 Host, buff);
399 DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
400 return 1;
401}
402
403/******************************************************************************/
404/* R e c e i v e */
405/******************************************************************************/
406
407int XrdCmsClientMan::Receive()
408{
409// This method is always run out of the object's main thread. Other threads
410// may call methods that initiate a link reset via a deferred close. We will
411// notice that here because the file descriptor will be closed. This will
412// cause us to return an error and precipitate a connection teardown.
413//
414 EPNAME("Receive")
415 if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
416 {int dlen = static_cast<int>(ntohs(Response.datalen));
417 RecvCnt++;
418 DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
419 if (!dlen) return 1;
420 if ((dlen > NetBuff->BuffSize())
421 && (Response.rrCode != kYR_data || !NetBuff->Resize(dlen)))
422 Say.Emsg("ClientMan", "Excessive msg length from", Host);
423 else {NetBuff->SetLen(dlen);
424 return Link->RecvAll(NetBuff->Buffer(), dlen);
425 }
426 }
427 return 0;
428}
429
430/******************************************************************************/
431/* r e l a y R e s p */
432/******************************************************************************/
433
434void XrdCmsClientMan::relayResp()
435{
436 EPNAME("relayResp");
437 XrdCmsResp *rp;
438
439// Remove the response object from our queue.
440//
441 if (!(rp = RespQ.Rem(Response.streamid)))
442 {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
443 return;
444 }
445
446// Queue the request for reply (this transfers the network buffer)
447//
448 rp->Reply(HPfx, Response, NetBuff);
449
450// Obtain a new network buffer
451//
452 NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
453}
454
455/******************************************************************************/
456/* Private: c h k S t a t u s */
457/******************************************************************************/
458
459int XrdCmsClientMan::chkStatus()
460{
461 static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
462 XrdSysMutexHelper mdMon(myData);
463 time_t nowTime;
464
465// Count down the query count and ask again every 30 seconds
466//
467 if (!chkCount--)
468 {chkCount = chkVal;
469 nowTime = time(0);
470 if ((nowTime - lastUpdt) >= 30)
471 {lastUpdt = nowTime;
472 if (Active) Link->Send((char *)&Updt, sizeof(Updt));
473 }
474 }
475 return Suspend;
476}
477
478/******************************************************************************/
479/* s e t S t a t u s */
480/******************************************************************************/
481
482void XrdCmsClientMan::setStatus()
483{
484 EPNAME("setStatus");
485 const char *State = 0, *Event = "?";
486
487
488 myData.Lock();
489 if (Response.modifier & CmsStatusRequest::kYR_Suspend)
490 {Event = "suspend";
491 if (!Suspend) {Suspend = 1; State = "suspended";}
492 }
493 else if (Response.modifier & CmsStatusRequest::kYR_Resume)
494 {Event = "resume";
495 if (Suspend) {Suspend = 0; State = "resumed";}
496 }
497 myData.UnLock();
498
499 DEBUG(Host <<" sent " <<Event <<" event");
500 if (State) Say.Emsg("setStatus", "Manager", Host, State);
501}
unsigned char kXR_char
Definition XPtypes.hh:65
#define DEBUG(x)
#define EPNAME(x)
#define XRDNET_NOEMSG
Definition XrdNetOpts.hh:71
struct myOpts opts
#define SFS_ERROR
#define SFS_STALL
#define SFS_STARTED
if(ec< 0) ec
#define TRACE(act, x)
Definition XrdTrace.hh:63
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
static char doDebug
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
XrdCmsResp * Rem(int msgid)
void Add(XrdCmsResp *rp)
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition XrdCmsResp.cc:64
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
Definition XrdInet.cc:185
XrdOucBuffer * Alloc(int sz)
char * Buffer() const
bool Resize(int newsz)
int BuffSize() const
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
const char * getErrUser()
int setErrInfo(int code, const char *emsg)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
kXR_char modifier
Definition YProtocol.hh:85
@ kYR_waitresp
Definition YProtocol.hh:145
XrdSysError Say
kXR_char rrCode
Definition YProtocol.hh:84
@ kYR_update
Definition YProtocol.hh:115
@ kYR_status
Definition YProtocol.hh:112
static const size_t Max_Error_Len