36XCpCtx::XCpCtx(
const std::vector<std::string> &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize ) :
37 pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38 pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39 pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40 pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
49 while( !pSink.IsEmpty() )
60 if( pUrls.empty() )
return false;
68 uint64_t transferRate = -1;
71 std::list<XCpSrc*>::iterator itr;
74 for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
77 if( src == exclude )
continue;
79 if( src->
HasData() && tmp < transferRate )
86 if( !ret )
return ret;
99 uint64_t blkSize = pBlockSize, offset = pOffset;
100 if( pOffset + blkSize > uint64_t( pFileSize ) )
101 blkSize = pFileSize - pOffset;
104 return std::make_pair( offset, blkSize );
111 if( pFileSize < 0 && size >= 0 )
114 pFileSizeCV.Broadcast();
116 if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117 pBlockSize = pFileSize / pParallelSrc;
119 if( pBlockSize < pChunkSize )
120 pBlockSize = pChunkSize;
126 for( uint8_t i = 0; i < pParallelSrc; ++i )
128 XCpSrc *src =
new XCpSrc( pChunkSize, pParallelChunks, pFileSize,
this );
129 pSources.push_back( src );
132 auto scpy = pSources;
134 for(
auto src: scpy) {
149 log->
Error(
UtilityMsg,
"Failed to initialize (failed to create new threads)" );
159 if( pDataReceived == uint64_t( pFileSize ) )
168 if( GetRunning() == 0 )
180 ci = std::move( *chunk );
203size_t XCpCtx::GetRunning()
206 size_t nbRunning = 0;
207 std::list<XCpSrc*>::iterator itr;
210 for( itr = pSources.begin() ; itr != pSources.end() ; ++ itr)
211 if( (*itr)->IsRunning() )
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
bool GetNextUrl(std::string &url)
XCpSrc * WeakestLink(XCpSrc *exclude)
void PutChunk(PageInfo *chunk)
XCpCtx(const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
void SetFileSize(int64_t size)
std::pair< uint64_t, uint64_t > GetBlock()
XRootDStatus Initialize()
XRootDStatus GetChunk(XrdCl::PageInfo &ci)
static void DeleteChunk(PageInfo *&chunk)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t UtilityMsg
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.
uint32_t GetLength() const
Get the data length.