XRootD
Loading...
Searching...
No Matches
XrdXrootdNormAio.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d N o r m A i o . c c */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <cstdio>
33#include <sys/uio.h>
34
35#include "Xrd/XrdLink.hh"
36#include "Xrd/XrdScheduler.hh"
38#include "XrdSys/XrdSysError.hh"
45
46#define TRACELINK dataLink
47
48/******************************************************************************/
49/* G l o b a l S t a t i c s */
50/******************************************************************************/
51
53
54namespace XrdXrootd
55{
56extern XrdSysError eLog;
57extern XrdScheduler *Sched;
58}
59using namespace XrdXrootd;
60
61/******************************************************************************/
62/* S t a t i c M e m e b e r s */
63/******************************************************************************/
64
65const char *XrdXrootdNormAio::TraceID = "NormAio";
66
67/******************************************************************************/
68/* L o c a l S t a t i c s */
69/******************************************************************************/
70
71namespace
72{
73XrdSysMutex fqMutex;
74XrdXrootdNormAio *fqFirst = 0;
75int numFree = 0;
76
77static const int maxKeep = 64; // Keep in reserve
78}
79
80/******************************************************************************/
81/* A l l o c */
82/******************************************************************************/
83
86 XrdXrootdFile *fP)
87{
88 XrdXrootdNormAio *reqP;
89
90// Obtain a preallocated aio request object
91//
92 fqMutex.Lock();
93 if ((reqP = fqFirst))
94 {fqFirst = reqP->nextNorm;
95 numFree--;
96 }
97 fqMutex.UnLock();
98
99// If we have no object, create a new one
100//
101 if (!reqP) reqP = new XrdXrootdNormAio;
102
103// Initialize the object and return it
104//
105 reqP->Init(protP, resp, fP);
106 reqP->nextNorm = 0;
107 return reqP;
108}
109
110/******************************************************************************/
111/* Private: C o p y F 2 L _ A d d 2 Q */
112/******************************************************************************/
113
114bool XrdXrootdNormAio::CopyF2L_Add2Q(XrdXrootdAioBuff *aioP)
115{
116 int dlen, rc;
117
118// Dispatch the requested number of aio requests if we have enough data
119//
120 if (dataLen > 0)
121 {if (!aioP && !(aioP = XrdXrootdAioBuff::Alloc(this)))
122 {if (inFlight) return true;
123 SendError(ENOMEM, "insufficient memory");
124 return false;
125 }
127 if (dataLen >= (int)aioP->sfsAio.aio_nbytes)
128 dlen = aioP->sfsAio.aio_nbytes;
129 else dlen = aioP->sfsAio.aio_nbytes = dataLen;
130
131 if ((rc = dataFile->XrdSfsp->read((XrdSfsAio *)aioP)) != SFS_OK)
132 {SendFSError(rc);
133 aioP->Recycle();
134 return false;
135 }
136 inFlight++;
137 TRACEP(FSAIO, "aioR beg " <<dlen <<'@' <<dataOffset
138 <<" inF=" <<int(inFlight));
139 dataOffset += dlen;
140 dataLen -= dlen;
141 if (dataLen <= 0)
142 {dataFile->aioFob->Schedule(Protocol);
143 aioState |= aioSchd;
144 }
145 }
146 return true;
147}
148
149/******************************************************************************/
150/* Private: C o p y F 2 L */
151/******************************************************************************/
152
153void XrdXrootdNormAio::CopyF2L()
154{
155 XrdXrootdAioBuff *aioP;
156 bool aOK = true;
157
158// Pick a finished element off the pendQ. Wait for an oustanding buffer if we
159// reached our buffer limit. Otherwise, ask for a return if we can start anew.
160// Note: We asked getBuff() if it returns nil to not release the lock.
161//
163 if (!(aioP = getBuff(doWait)))
164 {if (isDone || !CopyF2L_Add2Q()) break;
165 continue;
166 }
167
168// Step 1: do some tracing
169//
170 TRACEP(FSAIO,"aioR end "<<aioP->sfsAio.aio_nbytes
171 <<'@'<<aioP->sfsAio.aio_offset
172 <<" result="<<aioP->Result<<" D-S="<<isDone<<'-'<<int(Status)
173 <<" inF="<<int(inFlight));
174
175// Step 2: Validate this buffer
176//
177 if (!Validate(aioP))
178 {if (aioP != finalRead) aioP->Recycle();
179 continue;
180 }
181
182// Step 3: Since block may come back out of order we need to make sure we are
183// sending then in proper order with no gaps.
184//
185 if (aioP->sfsAio.aio_offset != sendOffset && !isDone)
186 {XrdXrootdAioBuff *bP = sendQ, *bPP = 0;
187 while(bP)
188 {if (aioP->sfsAio.aio_offset < bP->sfsAio.aio_offset) break;
189 bPP = bP; bP = bP->next;
190 }
191 aioP->next = bP;
192 if (bPP) bPP->next = aioP;
193 else sendQ = aioP;
194 reorders++;
195 TRACEP(FSAIO,"aioR inQ "<<aioP->Result<<'@'<<aioP->sfsAio.aio_offset);
196 continue;
197 }
198
199// Step 4: If this is the last block to be read then establish the actual
200// last block to be used for final status to avoid an extra response.
201//
202 if (inFlight == 0 && dataLen == 0 && !finalRead)
203 {if (!sendQ)
204 {finalRead = aioP;
205 break;
206 } else {
207 XrdXrootdAioBuff *bP = sendQ, *bPP = 0;
208 while(bP->next) {bPP = bP; bP = bP->next;}
209 if (bPP) {finalRead = bP; bPP->next = 0;}
210 else {finalRead = sendQ; sendQ = 0;}
211 }
212 }
213
214// Step 5: Send the data to the client and if successful, see if we need to
215// schedule more data to be read from the data source.
216//
217 if (isDone || !Send(aioP) || dataLen <= 0) aioP->Recycle();
218 else if (!CopyF2L_Add2Q(aioP)) break;
219
220// Step 6: Now send any queued messages that are eligible to be sent
221//
222 while(sendQ && sendQ->sfsAio.aio_offset == sendOffset && aOK)
223 {aioP = sendQ;
224 sendQ = sendQ->next;
225 TRACEP(FSAIO,"aioR deQ "<<aioP->Result<<'@'<<aioP->sfsAio.aio_offset);
226 if (!isDone && Send(aioP) && dataLen) aOK = CopyF2L_Add2Q(aioP);
227 else aioP->Recycle();
228 }
229
230 } while(inFlight > 0 && aOK);
231
232// If we are here then the request has finished. If all went well,
233// fire off the final response.
234// .
235 if (!isDone)
236 {if (sendQ)
237 {char ebuff[80];
238 snprintf(ebuff, sizeof(ebuff), "aio read failed at offset %lld; "
239 "missing data", static_cast<long long>(sendOffset));
240 SendError(ENODEV, ebuff);
241 } else Send(finalRead, true);
242 }
243
244// Cleanup anything left over
245//
246 if (finalRead) finalRead->Recycle();
247 while((aioP = sendQ)) {sendQ = sendQ->next; aioP->Recycle();}
248
249// If we encountered a fatal link error then cancel any pending aio reads on
250// this link. Otherwise if we have not yet scheduled the next aio, do so.
251//
252 if (aioState & aioDead) dataFile->aioFob->Reset(Protocol);
253 else if (!(aioState & aioSchd)) dataFile->aioFob->Schedule(Protocol);
254
255// Do a quick drain if something is still in flight for logging purposes.
256// If the quick drain wasn't successful, then draining will be done in
257// the background; which, of course, might never complete. Otherwise, recycle.
258//
259 if (!inFlight) Recycle(true);
260 else Recycle(Drain());
261}
262
263/******************************************************************************/
264/* Private: C o p y L 2 F */
265/******************************************************************************/
266
267int XrdXrootdNormAio::CopyL2F()
268{
269 XrdXrootdAioBuff *aioP;
270 int dLen, rc;
271
272// Pick a finished element off the pendQ. If there are no elements then get
273// a new one if we can. Otherwise, we will have to wait for one to come back.
274// Unlike read() writes are bound to a socket and we cannot reliably
275// give up the thread by returning to level 0.
276//
278 if (!(aioP = getBuff(doWait)))
279 {if (isDone) return 0;
280 if (!(aioP = XrdXrootdAioBuff::Alloc(this)))
281 {SendError(ENOMEM, "insufficient memory");
282 return 0;
283 }
284 } else {
285
286 TRACEP(FSAIO, "aioW end "<<aioP->sfsAio.aio_nbytes<<'@'
287 <<aioP->sfsAio.aio_offset<<" result="<<aioP->Result
288 <<" D-S="<<isDone<<'-'<<int(Status)<<" inF="<<int(inFlight));
289
290// If the aio failed, send an error
291//
292 if (aioP->Result <= 0)
293 {SendError(-aioP->Result, 0);
294 aioP->Recycle();
295 return 0; // Caller will drain
296 }
297
298// If we have no data or status was posted, ignore the result
299//
300 if (dataLen <= 0 || isDone)
301 {aioP->Recycle();
302 continue;
303 }
304 }
305
306// Setup the aio object
307//
309 if (dataLen >= (int)aioP->sfsAio.aio_nbytes)
310 dLen = aioP->sfsAio.aio_nbytes;
311 else dLen = aioP->sfsAio.aio_nbytes = dataLen;
312 dataOffset += dLen;
313 dataLen -= dLen;
314
315// Issue the read to get the data into the buffer
316//
317 if ((rc = Protocol->getData(this,"aiowr",(char *)aioP->sfsAio.aio_buf,dLen)))
318 {if (rc > 0) pendWrite = aioP;
319 else {aioP->Recycle(); // rc must be < 0!
320 dataLen = 0;
321 }
322 return rc;
323 }
324
325// Complete the write operation
326//
327 if (!CopyL2F(aioP)) return 0;
328
329 } while(inFlight);
330
331// If we finished successfully, send off final response otherwise its an error.
332//
333 if (!isDone)
334 {if (!dataLen) return (Send(0) ? 0 : -1);
335 SendError(EIDRM, "aioWrite encountered an impossible condition");
336 eLog.Emsg("NormAio", "write logic error for",
337 dataLink->ID, dataFile->FileKey);
338 }
339
340// Cleanup as we don't know where we will return
341//
342 return 0;
343}
344
345/******************************************************************************/
346
347bool XrdXrootdNormAio::CopyL2F(XrdXrootdAioBuff *aioP)
348{
349
350// Write out the data
351//
352 int rc = dataFile->XrdSfsp->write((XrdSfsAio *)aioP);
353 if (rc != SFS_OK)
354 {SendFSError(rc);
355 aioP->Recycle();
356 return false;
357 }
358
359// Do some tracing and return
360//
361 inFlight++;
362 TRACEP(FSAIO,"aioW beg "<<aioP->sfsAio.aio_nbytes <<'@'
363 <<aioP->sfsAio.aio_offset <<" inF=" <<int(inFlight));
364 return true;
365}
366
367/******************************************************************************/
368/* D o I t */
369/******************************************************************************/
370
371// This method is invoked when we have run out of aio objects but have inflight
372// objects during reading. In that case, we must relinquish the thread. When an
373// aio object completes it will reschedule this object on a new thread.
374
376{
377// Reads run disconnected as they will never read from the link.
378//
379 if (aioState & aioRead) CopyF2L();
380}
381
382/******************************************************************************/
383/* R e a d */
384/******************************************************************************/
385
386void XrdXrootdNormAio::Read(long long offs, int dlen)
387{
388
389// Setup the copy from the file to the network
390//
391 dataOffset = highOffset = sendOffset = offs;
392 dataLen = dlen;
394
395// Reads run disconnected and are self-terminating, so we need to increase the
396// refcount for the link we will be using to prevent it from disapearing.
397// Recycle will decrement it but does so only for reads. We always update
398// the file refcount and increase the request count.
399//
400 dataLink->setRef(1);
401 dataFile->Ref(1);
402 Protocol->aioUpdReq(1);
403
404// Schedule ourselves to run this asynchronously and return
405//
406 dataFile->aioFob->Schedule(this);
407}
408
409/******************************************************************************/
410/* R e c y c l e */
411/******************************************************************************/
412
414{
415// Update request count, file and link reference count
416//
417 if (!(aioState & aioHeld))
418 {Protocol->aioUpdReq(-1);
419 if (aioState & aioRead)
420 {dataFile->Ref(-1);
421 dataLink->setRef(-1);
422 }
423 aioState |= aioHeld;
424 }
425
426// Do some tracing and reset reorder counter
427//
428 TRACEP(FSAIO,"aio"<<(aioState & aioRead ? 'R' : 'W')<<" recycle"
429 <<(release ? "" : " hold")<<"; reorders="<<reorders
430 <<" D-S="<<isDone<<'-'<<int(Status));
431 reorders = 0;
432
433// Place the object on the free queue if possible
434//
435 if (release)
436 {fqMutex.Lock();
437 if (numFree >= maxKeep)
438 {fqMutex.UnLock();
439 delete this;
440 } else {
441 nextNorm = fqFirst;
442 fqFirst = this;
443 numFree++;
444 fqMutex.UnLock();
445 }
446 }
447}
448
449/******************************************************************************/
450/* Private: S e n d */
451/******************************************************************************/
452
453bool XrdXrootdNormAio::Send(XrdXrootdAioBuff *aioP, bool final)
454{
455 XResponseType code = (final ? kXR_ok : kXR_oksofar);
456 int rc;
457
458// Send the data (note that no data means it's a finalresponse)
459//
460 if (aioP)
461 {rc = Response.Send(code,(void*)aioP->sfsAio.aio_buf,aioP->Result);
462 sendOffset = aioP->sfsAio.aio_offset + aioP->Result;
463 } else rc = Response.Send();
464
465// Diagnose any errors
466//
467 if (rc || final)
468 {isDone = true;
469 dataLen = 0;
470 if (rc) aioState |= aioDead;
471 }
472 return rc == 0;
473}
474
475/******************************************************************************/
476/* W r i t e */
477/******************************************************************************/
478
479int XrdXrootdNormAio::Write(long long offs, int dlen)
480{
481// Update request count. Note that dataLink and dataFile references are
482// handled outboard as writes are inextricably tied to the data link.
483//
484 Protocol->aioUpdReq(1);
485
486// Setup the copy from the network to the file
487//
489 dataOffset = highOffset = offs;
490 dataLen = dlen;
491
492// Since this thread can't do anything else since it's blocked by the socket
493// we simply initiate the write operation via a simulated getData() callback.
494//
495 return gdDone();
496}
XResponseType
Definition XProtocol.hh:898
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_ok
Definition XProtocol.hh:899
XrdOucTrace * XrdXrootdTrace
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
#define SFS_OK
#define TRACEP(act, x)
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void Recycle()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdXrootdAioBuff * next
virtual void Recycle() override
static XrdXrootdAioBuff * Alloc(XrdXrootdAioTask *arp)
int gdDone() override
XrdXrootdFile * dataFile
bool Validate(XrdXrootdAioBuff *aioP)
XrdXrootdAioBuff * getBuff(bool wait)
void SendError(int rc, const char *eText)
XrdXrootdResponse Response
void Init(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
void SendFSError(int rc)
static const int aioRead
static const int aioSchd
static const int aioHeld
RAtomic_uchar inFlight
static const int aioDead
XrdXrootdProtocol * Protocol
void DoIt() override
int Write(long long offs, int dlen) override
void Recycle(bool release) override
void Read(long long offs, int dlen) override
static XrdXrootdNormAio * Alloc(XrdXrootdProtocol *protP, XrdXrootdResponse &resp, XrdXrootdFile *fP)
XrdScheduler * Sched
XrdSysError eLog