Submit chunks to be downloaded as independent tasks

pull/132/head
Ryan Kistner 5 years ago
parent f8d67d8c9b
commit 609a66f280

@ -787,9 +787,11 @@ namespace DepotDownloader
}
} );
var semaphore = new SemaphoreSlim( Config.MaxDownloads );
var ioSemaphore = new SemaphoreSlim( Config.MaxDownloads );
var networkSemaphore = new SemaphoreSlim( Config.MaxDownloads );
var files = filesAfterExclusions.Where( f => !f.Flags.HasFlag( EDepotFileFlag.Directory ) ).ToArray();
var tasks = new Task[ files.Length ];
var ioTasks = new Task[ files.Length ];
for ( var i = 0; i < files.Length; i++ )
{
var file = files[ i ];
@ -799,7 +801,7 @@ namespace DepotDownloader
try
{
await semaphore.WaitAsync().ConfigureAwait( false );
await ioSemaphore.WaitAsync().ConfigureAwait( false );
cts.Token.ThrowIfCancellationRequested();
string fileFinalPath = Path.Combine( depot.installDir, file.FileName );
@ -910,99 +912,136 @@ namespace DepotDownloader
}
}
foreach ( var chunk in neededChunks )
{
if ( cts.IsCancellationRequested ) break;
var fileSemaphore = new SemaphoreSlim(1);
string chunkID = Util.EncodeHexString( chunk.ChunkID );
CDNClient.DepotChunk chunkData = null;
var downloadTasks = new Task<DepotManifest.ChunkData>[neededChunks.Count];
for (var x = 0; x < neededChunks.Count; x++)
{
var chunk = neededChunks[x];
while ( !cts.IsCancellationRequested )
var downloadTask = Task.Run(async () =>
{
Tuple<CDNClient.Server, string> connection;
cts.Token.ThrowIfCancellationRequested();
try
{
connection = await cdnPool.GetConnectionForDepot( appId, depot.id, cts.Token );
}
catch ( OperationCanceledException )
{
break;
}
await networkSemaphore.WaitAsync().ConfigureAwait(false);
cts.Token.ThrowIfCancellationRequested();
DepotManifest.ChunkData data = new DepotManifest.ChunkData();
data.ChunkID = chunk.ChunkID;
data.Checksum = chunk.Checksum;
data.Offset = chunk.Offset;
data.CompressedLength = chunk.CompressedLength;
data.UncompressedLength = chunk.UncompressedLength;
string chunkID = Util.EncodeHexString(chunk.ChunkID);
CDNClient.DepotChunk chunkData = null;
try
{
chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync( depot.id, data,
connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait( false );
cdnPool.ReturnConnection( connection );
break;
}
catch ( SteamKitWebRequestException e )
{
cdnPool.ReturnBrokenConnection( connection );
while (!cts.IsCancellationRequested)
{
Tuple<CDNClient.Server, string> connection;
try
{
connection = await cdnPool.GetConnectionForDepot(appId, depot.id, cts.Token);
}
catch (OperationCanceledException)
{
break;
}
DepotManifest.ChunkData data = new DepotManifest.ChunkData();
data.ChunkID = chunk.ChunkID;
data.Checksum = chunk.Checksum;
data.Offset = chunk.Offset;
data.CompressedLength = chunk.CompressedLength;
data.UncompressedLength = chunk.UncompressedLength;
try
{
chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync(depot.id, data,
connection.Item1, connection.Item2, depot.depotKey).ConfigureAwait(false);
cdnPool.ReturnConnection(connection);
break;
}
catch (SteamKitWebRequestException e)
{
cdnPool.ReturnBrokenConnection(connection);
if (e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden)
{
Console.WriteLine("Encountered 401 for chunk {0}. Aborting.", chunkID);
cts.Cancel();
break;
}
else
{
Console.WriteLine("Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode);
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Connection timeout downloading chunk {0}", chunkID);
}
catch (Exception e)
{
cdnPool.ReturnBrokenConnection(connection);
Console.WriteLine("Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message);
}
}
if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden )
if (chunkData == null)
{
Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID );
Console.WriteLine("Failed to find any server with chunk {0} for depot {1}. Aborting.", chunkID, depot.id);
cts.Cancel();
break;
}
else
// Throw the cancellation exception if requested so that this task is marked failed
cts.Token.ThrowIfCancellationRequested();
try
{
Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode );
await fileSemaphore.WaitAsync().ConfigureAwait(false);
fs.Seek((long)chunkData.ChunkInfo.Offset, SeekOrigin.Begin);
fs.Write(chunkData.Data, 0, chunkData.Data.Length);
return chunkData.ChunkInfo;
}
finally
{
fileSemaphore.Release();
}
}
catch ( TaskCanceledException )
{
Console.WriteLine( "Connection timeout downloading chunk {0}", chunkID );
}
catch ( Exception e )
finally
{
cdnPool.ReturnBrokenConnection( connection );
Console.WriteLine( "Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message );
networkSemaphore.Release();
}
}
});
if ( chunkData == null )
{
Console.WriteLine( "Failed to find any server with chunk {0} for depot {1}. Aborting.", chunkID, depot.id );
cts.Cancel();
}
downloadTasks[x] = downloadTask;
}
// Throw the cancellation exception if requested so that this task is marked failed
cts.Token.ThrowIfCancellationRequested();
var completedDownloads = await Task.WhenAll(downloadTasks).ConfigureAwait(false);
TotalBytesCompressed += chunk.CompressedLength;
DepotBytesCompressed += chunk.CompressedLength;
TotalBytesUncompressed += chunk.UncompressedLength;
DepotBytesUncompressed += chunk.UncompressedLength;
fs.Dispose();
fs.Seek( ( long )chunk.Offset, SeekOrigin.Begin );
fs.Write( chunkData.Data, 0, chunkData.Data.Length );
foreach (var chunkInfo in completedDownloads)
{
TotalBytesCompressed += chunkInfo.CompressedLength;
DepotBytesCompressed += chunkInfo.CompressedLength;
TotalBytesUncompressed += chunkInfo.UncompressedLength;
DepotBytesUncompressed += chunkInfo.UncompressedLength;
size_downloaded += chunk.UncompressedLength;
size_downloaded += chunkInfo.UncompressedLength;
}
fs.Dispose();
Console.WriteLine( "{0,6:#00.00}% {1}", ( ( float )size_downloaded / ( float )complete_download_size ) * 100.0f, fileFinalPath );
Console.WriteLine("{0,6:#00.00}% {1}", ((float)size_downloaded / (float)complete_download_size) * 100.0f, fileFinalPath);
}
finally
{
semaphore.Release();
ioSemaphore.Release();
}
} );
tasks[ i ] = task;
ioTasks[ i ] = task;
}
await Task.WhenAll( tasks ).ConfigureAwait( false );
await Task.WhenAll(ioTasks).ConfigureAwait(false);
DepotConfigStore.Instance.InstalledManifestIDs[ depot.id ] = depot.manifestId;
DepotConfigStore.Save();

@ -108,7 +108,7 @@ namespace DepotDownloader
ContentDownloader.Config.VerifyAll = HasParameter( args, "-verify-all" ) || HasParameter( args, "-verify_all" ) || HasParameter( args, "-validate" );
ContentDownloader.Config.MaxServers = GetParameter<int>( args, "-max-servers", 20 );
ContentDownloader.Config.MaxDownloads = GetParameter<int>( args, "-max-downloads", 4 );
ContentDownloader.Config.MaxDownloads = GetParameter<int>( args, "-max-downloads", 8 );
ContentDownloader.Config.MaxServers = Math.Max( ContentDownloader.Config.MaxServers, ContentDownloader.Config.MaxDownloads );
ContentDownloader.Config.LoginID = HasParameter( args, "-loginid" ) ? (uint?)GetParameter<uint>( args, "-loginid" ) : null;
@ -341,7 +341,7 @@ namespace DepotDownloader
Console.WriteLine();
Console.WriteLine( "\t-manifest-only\t\t\t- downloads a human readable manifest for any depots that would be downloaded." );
Console.WriteLine( "\t-cellid <#>\t\t\t\t- the overridden CellID of the content server to download from." );
Console.WriteLine( "\t-max-servers <#>\t\t- maximum number of content servers to use. (default: 8)." );
Console.WriteLine( "\t-max-servers <#>\t\t- maximum number of content servers to use. (default: 20)." );
Console.WriteLine( "\t-max-downloads <#>\t\t- maximum number of chunks to download concurrently. (default: 4)." );
Console.WriteLine( "\t-loginid <#>\t\t- a unique 32-bit integer Steam LogonID in decimal, required if running multiple instances of DepotDownloader concurrently." );
}

@ -129,16 +129,19 @@ namespace DepotDownloader
}
public delegate bool WaitCondition();
private object steamLock = new object();
public bool WaitUntilCallback( Action submitter, WaitCondition waiter )
{
while ( !bAborted && !waiter() )
{
submitter();
lock (steamLock)
submitter();
int seq = this.seq;
do
{
WaitForCallbacks();
lock (steamLock)
WaitForCallbacks();
}
while ( !bAborted && this.seq == seq && !waiter() );
}
@ -473,7 +476,7 @@ namespace DepotDownloader
public void TryWaitForLoginKey()
{
if ( logonDetails.Username == null || !ContentDownloader.Config.RememberPassword ) return;
if ( logonDetails.Username == null || !credentials.LoggedOn || !ContentDownloader.Config.RememberPassword ) return;
var totalWaitPeriod = DateTime.Now.AddSeconds( 3 );
@ -584,7 +587,7 @@ namespace DepotDownloader
}
else
{
Console.WriteLine( "Login key was expired. Please enter your password: " );
Console.Write( "Login key was expired. Please enter your password: " );
logonDetails.Password = Util.ReadPassword();
}
}

@ -51,5 +51,5 @@ Parameter | Description
-manifest-only | downloads a human readable manifest for any depots that would be downloaded.
-cellid \<#> | the overridden CellID of the content server to download from.
-max-servers \<#> | maximum number of content servers to use. (default: 8).
-max-downloads \<#> | maximum number of chunks to download concurrently. (default: 4).
-max-downloads \<#> | maximum number of chunks to download concurrently. (default: 8).
-loginid \<#> | a unique 32-bit integer Steam LogonID in decimal, required if running multiple instances of DepotDownloader concurrently.

Loading…
Cancel
Save