From 609a66f2801f9091086a30648debc0ec472d6362 Mon Sep 17 00:00:00 2001 From: Ryan Kistner Date: Thu, 13 Aug 2020 22:14:11 -0600 Subject: [PATCH] Submit chunks to be downloaded as independent tasks --- DepotDownloader/ContentDownloader.cs | 169 ++++++++++++++++----------- DepotDownloader/Program.cs | 4 +- DepotDownloader/Steam3Session.cs | 11 +- README.md | 2 +- 4 files changed, 114 insertions(+), 72 deletions(-) diff --git a/DepotDownloader/ContentDownloader.cs b/DepotDownloader/ContentDownloader.cs index d67c2ac6..aa40d00e 100644 --- a/DepotDownloader/ContentDownloader.cs +++ b/DepotDownloader/ContentDownloader.cs @@ -787,9 +787,11 @@ namespace DepotDownloader } } ); - var semaphore = new SemaphoreSlim( Config.MaxDownloads ); + var ioSemaphore = new SemaphoreSlim( Config.MaxDownloads ); + var networkSemaphore = new SemaphoreSlim( Config.MaxDownloads ); + var files = filesAfterExclusions.Where( f => !f.Flags.HasFlag( EDepotFileFlag.Directory ) ).ToArray(); - var tasks = new Task[ files.Length ]; + var ioTasks = new Task[ files.Length ]; for ( var i = 0; i < files.Length; i++ ) { var file = files[ i ]; @@ -799,7 +801,7 @@ namespace DepotDownloader try { - await semaphore.WaitAsync().ConfigureAwait( false ); + await ioSemaphore.WaitAsync().ConfigureAwait( false ); cts.Token.ThrowIfCancellationRequested(); string fileFinalPath = Path.Combine( depot.installDir, file.FileName ); @@ -910,99 +912,136 @@ namespace DepotDownloader } } - foreach ( var chunk in neededChunks ) - { - if ( cts.IsCancellationRequested ) break; + var fileSemaphore = new SemaphoreSlim(1); - string chunkID = Util.EncodeHexString( chunk.ChunkID ); - CDNClient.DepotChunk chunkData = null; + var downloadTasks = new Task[neededChunks.Count]; + for (var x = 0; x < neededChunks.Count; x++) + { + var chunk = neededChunks[x]; - while ( !cts.IsCancellationRequested ) + var downloadTask = Task.Run(async () => { - Tuple connection; + cts.Token.ThrowIfCancellationRequested(); + try { - connection = await cdnPool.GetConnectionForDepot( appId, depot.id, cts.Token ); - } - catch ( OperationCanceledException ) - { - break; - } + await networkSemaphore.WaitAsync().ConfigureAwait(false); + cts.Token.ThrowIfCancellationRequested(); - DepotManifest.ChunkData data = new DepotManifest.ChunkData(); - data.ChunkID = chunk.ChunkID; - data.Checksum = chunk.Checksum; - data.Offset = chunk.Offset; - data.CompressedLength = chunk.CompressedLength; - data.UncompressedLength = chunk.UncompressedLength; + string chunkID = Util.EncodeHexString(chunk.ChunkID); + CDNClient.DepotChunk chunkData = null; - try - { - chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync( depot.id, data, - connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait( false ); - cdnPool.ReturnConnection( connection ); - break; - } - catch ( SteamKitWebRequestException e ) - { - cdnPool.ReturnBrokenConnection( connection ); + while (!cts.IsCancellationRequested) + { + Tuple connection; + try + { + connection = await cdnPool.GetConnectionForDepot(appId, depot.id, cts.Token); + } + catch (OperationCanceledException) + { + break; + } + + DepotManifest.ChunkData data = new DepotManifest.ChunkData(); + data.ChunkID = chunk.ChunkID; + data.Checksum = chunk.Checksum; + data.Offset = chunk.Offset; + data.CompressedLength = chunk.CompressedLength; + data.UncompressedLength = chunk.UncompressedLength; + + try + { + chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync(depot.id, data, + connection.Item1, connection.Item2, depot.depotKey).ConfigureAwait(false); + cdnPool.ReturnConnection(connection); + + break; + } + catch (SteamKitWebRequestException e) + { + cdnPool.ReturnBrokenConnection(connection); + + if (e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden) + { + Console.WriteLine("Encountered 401 for chunk {0}. Aborting.", chunkID); + cts.Cancel(); + break; + } + else + { + Console.WriteLine("Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode); + } + } + catch (TaskCanceledException) + { + Console.WriteLine("Connection timeout downloading chunk {0}", chunkID); + } + catch (Exception e) + { + cdnPool.ReturnBrokenConnection(connection); + Console.WriteLine("Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message); + } + } - if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden ) + if (chunkData == null) { - Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID ); + Console.WriteLine("Failed to find any server with chunk {0} for depot {1}. Aborting.", chunkID, depot.id); cts.Cancel(); - break; } - else + + // Throw the cancellation exception if requested so that this task is marked failed + cts.Token.ThrowIfCancellationRequested(); + + try { - Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode ); + await fileSemaphore.WaitAsync().ConfigureAwait(false); + + fs.Seek((long)chunkData.ChunkInfo.Offset, SeekOrigin.Begin); + fs.Write(chunkData.Data, 0, chunkData.Data.Length); + + return chunkData.ChunkInfo; + } + finally + { + fileSemaphore.Release(); } } - catch ( TaskCanceledException ) - { - Console.WriteLine( "Connection timeout downloading chunk {0}", chunkID ); - } - catch ( Exception e ) + finally { - cdnPool.ReturnBrokenConnection( connection ); - Console.WriteLine( "Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message ); + networkSemaphore.Release(); } - } + }); - if ( chunkData == null ) - { - Console.WriteLine( "Failed to find any server with chunk {0} for depot {1}. Aborting.", chunkID, depot.id ); - cts.Cancel(); - } + downloadTasks[x] = downloadTask; + } - // Throw the cancellation exception if requested so that this task is marked failed - cts.Token.ThrowIfCancellationRequested(); + var completedDownloads = await Task.WhenAll(downloadTasks).ConfigureAwait(false); - TotalBytesCompressed += chunk.CompressedLength; - DepotBytesCompressed += chunk.CompressedLength; - TotalBytesUncompressed += chunk.UncompressedLength; - DepotBytesUncompressed += chunk.UncompressedLength; + fs.Dispose(); - fs.Seek( ( long )chunk.Offset, SeekOrigin.Begin ); - fs.Write( chunkData.Data, 0, chunkData.Data.Length ); + foreach (var chunkInfo in completedDownloads) + { + TotalBytesCompressed += chunkInfo.CompressedLength; + DepotBytesCompressed += chunkInfo.CompressedLength; + TotalBytesUncompressed += chunkInfo.UncompressedLength; + DepotBytesUncompressed += chunkInfo.UncompressedLength; - size_downloaded += chunk.UncompressedLength; + size_downloaded += chunkInfo.UncompressedLength; } - fs.Dispose(); - - Console.WriteLine( "{0,6:#00.00}% {1}", ( ( float )size_downloaded / ( float )complete_download_size ) * 100.0f, fileFinalPath ); + Console.WriteLine("{0,6:#00.00}% {1}", ((float)size_downloaded / (float)complete_download_size) * 100.0f, fileFinalPath); } finally { - semaphore.Release(); + ioSemaphore.Release(); } } ); - tasks[ i ] = task; + ioTasks[ i ] = task; } - await Task.WhenAll( tasks ).ConfigureAwait( false ); + await Task.WhenAll(ioTasks).ConfigureAwait(false); DepotConfigStore.Instance.InstalledManifestIDs[ depot.id ] = depot.manifestId; DepotConfigStore.Save(); diff --git a/DepotDownloader/Program.cs b/DepotDownloader/Program.cs index b3b9b737..47a34794 100644 --- a/DepotDownloader/Program.cs +++ b/DepotDownloader/Program.cs @@ -108,7 +108,7 @@ namespace DepotDownloader ContentDownloader.Config.VerifyAll = HasParameter( args, "-verify-all" ) || HasParameter( args, "-verify_all" ) || HasParameter( args, "-validate" ); ContentDownloader.Config.MaxServers = GetParameter( args, "-max-servers", 20 ); - ContentDownloader.Config.MaxDownloads = GetParameter( args, "-max-downloads", 4 ); + ContentDownloader.Config.MaxDownloads = GetParameter( args, "-max-downloads", 8 ); ContentDownloader.Config.MaxServers = Math.Max( ContentDownloader.Config.MaxServers, ContentDownloader.Config.MaxDownloads ); ContentDownloader.Config.LoginID = HasParameter( args, "-loginid" ) ? (uint?)GetParameter( args, "-loginid" ) : null; @@ -341,7 +341,7 @@ namespace DepotDownloader Console.WriteLine(); Console.WriteLine( "\t-manifest-only\t\t\t- downloads a human readable manifest for any depots that would be downloaded." ); Console.WriteLine( "\t-cellid <#>\t\t\t\t- the overridden CellID of the content server to download from." ); - Console.WriteLine( "\t-max-servers <#>\t\t- maximum number of content servers to use. (default: 8)." ); + Console.WriteLine( "\t-max-servers <#>\t\t- maximum number of content servers to use. (default: 20)." ); Console.WriteLine( "\t-max-downloads <#>\t\t- maximum number of chunks to download concurrently. (default: 4)." ); Console.WriteLine( "\t-loginid <#>\t\t- a unique 32-bit integer Steam LogonID in decimal, required if running multiple instances of DepotDownloader concurrently." ); } diff --git a/DepotDownloader/Steam3Session.cs b/DepotDownloader/Steam3Session.cs index ec162698..853abb17 100644 --- a/DepotDownloader/Steam3Session.cs +++ b/DepotDownloader/Steam3Session.cs @@ -129,16 +129,19 @@ namespace DepotDownloader } public delegate bool WaitCondition(); + private object steamLock = new object(); public bool WaitUntilCallback( Action submitter, WaitCondition waiter ) { while ( !bAborted && !waiter() ) { - submitter(); + lock (steamLock) + submitter(); int seq = this.seq; do { - WaitForCallbacks(); + lock (steamLock) + WaitForCallbacks(); } while ( !bAborted && this.seq == seq && !waiter() ); } @@ -473,7 +476,7 @@ namespace DepotDownloader public void TryWaitForLoginKey() { - if ( logonDetails.Username == null || !ContentDownloader.Config.RememberPassword ) return; + if ( logonDetails.Username == null || !credentials.LoggedOn || !ContentDownloader.Config.RememberPassword ) return; var totalWaitPeriod = DateTime.Now.AddSeconds( 3 ); @@ -584,7 +587,7 @@ namespace DepotDownloader } else { - Console.WriteLine( "Login key was expired. Please enter your password: " ); + Console.Write( "Login key was expired. Please enter your password: " ); logonDetails.Password = Util.ReadPassword(); } } diff --git a/README.md b/README.md index 118f79ff..82444486 100644 --- a/README.md +++ b/README.md @@ -51,5 +51,5 @@ Parameter | Description -manifest-only | downloads a human readable manifest for any depots that would be downloaded. -cellid \<#> | the overridden CellID of the content server to download from. -max-servers \<#> | maximum number of content servers to use. (default: 8). --max-downloads \<#> | maximum number of chunks to download concurrently. (default: 4). +-max-downloads \<#> | maximum number of chunks to download concurrently. (default: 8). -loginid \<#> | a unique 32-bit integer Steam LogonID in decimal, required if running multiple instances of DepotDownloader concurrently.