From 663d17834a488da8608b65206d16d276c40ccdb1 Mon Sep 17 00:00:00 2001 From: Ryan Kistner Date: Wed, 29 Oct 2014 17:11:33 -0600 Subject: [PATCH] Parallelize CollectCDNClientsForDepot to ensure minimum amount of servers until exhausting the list Switched cdnClients to a BlockingQueue to prevent bad cdn clients being dropped from starving other threads --- DepotDownloader/ContentDownloader.cs | 98 +++++++++++++++++----------- 1 file changed, 59 insertions(+), 39 deletions(-) diff --git a/DepotDownloader/ContentDownloader.cs b/DepotDownloader/ContentDownloader.cs index 8c137e6c..1e191547 100644 --- a/DepotDownloader/ContentDownloader.cs +++ b/DepotDownloader/ContentDownloader.cs @@ -9,6 +9,7 @@ using System.Text.RegularExpressions; using System.Threading; using SteamKit2; using System.Collections.Concurrent; +using System.Threading.Tasks; namespace DepotDownloader { @@ -390,7 +391,14 @@ namespace DepotDownloader } } - DownloadSteam3( infos ); + try + { + DownloadSteam3(infos); + } + catch (OperationCanceledException) + { + Console.WriteLine("App {0} was not completely downloaded.", appId); + } } static DepotDownloadInfo GetDepotInfo(uint depotId, uint appId, string branch) @@ -451,9 +459,9 @@ namespace DepotDownloader public ProtoManifest.ChunkData NewChunk { get; private set; } } - private static ConcurrentQueue CollectCDNClientsForDepot(DepotDownloadInfo depot) + private static BlockingCollection CollectCDNClientsForDepot(DepotDownloadInfo depot) { - var cdnClients = new ConcurrentQueue(); + var cdnClients = new BlockingCollection(); CDNClient initialClient = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] ); List cdnServers = null; int tries = 5; @@ -478,47 +486,39 @@ namespace DepotDownloader } // Grab up to the first eight server in the allegedly best-to-worst order from Steam - var limit = cdnServers.Take( Config.MaxServers ); - tries = 0; - foreach( var s in limit ) + Parallel.ForEach(cdnServers, (server, parallelLoop) => { - CDNClient c; - - if ( tries == 0 ) - { - c = initialClient; - } - else - { - c = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] ); - } + CDNClient c = new CDNClient(steam3.steamClient, steam3.AppTickets[depot.id]); try { - c.Connect( s ); + c.Connect(server); string cdnAuthToken = null; - if ( s.Type == "CDN" ) + if (server.Type == "CDN") { - steam3.RequestCDNAuthToken(depot.id, s.Host); - cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, s.Host)].Token; + steam3.RequestCDNAuthToken(depot.id, server.Host); + cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, server.Host)].Token; } - c.AuthenticateDepot( depot.id, depot.depotKey, cdnAuthToken ); - cdnClients.Enqueue( c ); + c.AuthenticateDepot(depot.id, depot.depotKey, cdnAuthToken); + cdnClients.Add(c); + + if (cdnClients.Count >= Config.MaxServers) parallelLoop.Stop(); } catch { - Console.WriteLine( "\nFailed to connect to content server {0}. Remaining content servers for depot: {1}.", s, Config.MaxServers - tries - 1 ); + Console.WriteLine("\nFailed to connect to content server {0}", server); } - tries++; - } + }); if ( cdnClients.Count == 0 ) { Console.WriteLine( "\nUnable to find any content servers for depot {0} - {1}", depot.id, depot.contentName ); } + Config.MaxDownloads = Math.Min(Config.MaxDownloads, cdnClients.Count); + return cdnClients; } @@ -536,7 +536,9 @@ namespace DepotDownloader Console.Write("Finding content servers..."); var cdnClientsLock = new Object(); - ConcurrentQueue cdnClients = null; + BlockingCollection cdnClients = null; + int liveCdnClients = 0; + CancellationTokenSource cts = new CancellationTokenSource(); Console.WriteLine(" Done!"); @@ -582,6 +584,7 @@ namespace DepotDownloader DepotManifest depotManifest = null; cdnClients = CollectCDNClientsForDepot(depot); + liveCdnClients = cdnClients.Count; foreach (var c in cdnClients) { @@ -656,7 +659,7 @@ namespace DepotDownloader var rand = new Random(); filesAfterExclusions.Where(f => !f.Flags.HasFlag(EDepotFileFlag.Directory)) - .AsParallel().WithDegreeOfParallelism(Config.MaxDownloads) + .AsParallel().WithCancellation(cts.Token).WithDegreeOfParallelism(Config.MaxDownloads) .ForAll(file => { string fileFinalPath = Path.Combine(depot.installDir, file.FileName); @@ -775,35 +778,52 @@ namespace DepotDownloader if (cdnClients == null) { cdnClients = CollectCDNClientsForDepot(depot); + liveCdnClients = cdnClients.Count; } } } foreach (var chunk in neededChunks) { - string chunkID = Util.EncodeHexString(chunk.ChunkID); + if (cts.IsCancellationRequested) break; + string chunkID = Util.EncodeHexString(chunk.ChunkID); CDNClient.DepotChunk chunkData = null; - CDNClient client = null; - while (cdnClients.TryDequeue(out client)) + while (liveCdnClients > 0 && !cts.IsCancellationRequested) { - DepotManifest.ChunkData data = new DepotManifest.ChunkData(); - data.ChunkID = chunk.ChunkID; - data.Checksum = chunk.Checksum; - data.Offset = chunk.Offset; - data.CompressedLength = chunk.CompressedLength; - data.UncompressedLength = chunk.UncompressedLength; + CDNClient client; + try + { + client = cdnClients.Take(cts.Token); + } + catch (OperationCanceledException) + { + break; + } + + DepotManifest.ChunkData data = new DepotManifest.ChunkData(); + data.ChunkID = chunk.ChunkID; + data.Checksum = chunk.Checksum; + data.Offset = chunk.Offset; + data.CompressedLength = chunk.CompressedLength; + data.UncompressedLength = chunk.UncompressedLength; try { chunkData = client.DownloadDepotChunk(depot.id, data); - cdnClients.Enqueue(client); + cdnClients.Add(client); break; } - catch + catch (Exception e) { - Console.WriteLine("Encountered error downloading chunk {0}", chunkID); + Console.WriteLine("Encountered error downloading chunk {0}: {1} (live cdn clients: {2})", chunkID, e.Message, liveCdnClients); + int liveCount = Interlocked.Decrement(ref liveCdnClients); + if (liveCount == 0) + { + // we've run out of clients, tell other threads to abort + cts.Cancel(); + } } }