Parallelize CollectCDNClientsForDepot to ensure minimum amount of servers until exhausting the list

Switched cdnClients to a BlockingQueue to prevent bad cdn clients being dropped from starving other threads
pull/8/head
Ryan Kistner 11 years ago
parent eba7234f91
commit 663d17834a

@ -9,6 +9,7 @@ using System.Text.RegularExpressions;
using System.Threading;
using SteamKit2;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace DepotDownloader
{
@ -390,7 +391,14 @@ namespace DepotDownloader
}
}
DownloadSteam3( infos );
try
{
DownloadSteam3(infos);
}
catch (OperationCanceledException)
{
Console.WriteLine("App {0} was not completely downloaded.", appId);
}
}
static DepotDownloadInfo GetDepotInfo(uint depotId, uint appId, string branch)
@ -451,9 +459,9 @@ namespace DepotDownloader
public ProtoManifest.ChunkData NewChunk { get; private set; }
}
private static ConcurrentQueue<CDNClient> CollectCDNClientsForDepot(DepotDownloadInfo depot)
private static BlockingCollection<CDNClient> CollectCDNClientsForDepot(DepotDownloadInfo depot)
{
var cdnClients = new ConcurrentQueue<CDNClient>();
var cdnClients = new BlockingCollection<CDNClient>();
CDNClient initialClient = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] );
List<CDNClient.Server> cdnServers = null;
int tries = 5;
@ -478,47 +486,39 @@ namespace DepotDownloader
}
// Grab up to the first eight server in the allegedly best-to-worst order from Steam
var limit = cdnServers.Take( Config.MaxServers );
tries = 0;
foreach( var s in limit )
Parallel.ForEach(cdnServers, (server, parallelLoop) =>
{
CDNClient c;
if ( tries == 0 )
{
c = initialClient;
}
else
{
c = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] );
}
CDNClient c = new CDNClient(steam3.steamClient, steam3.AppTickets[depot.id]);
try
{
c.Connect( s );
c.Connect(server);
string cdnAuthToken = null;
if ( s.Type == "CDN" )
if (server.Type == "CDN")
{
steam3.RequestCDNAuthToken(depot.id, s.Host);
cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, s.Host)].Token;
steam3.RequestCDNAuthToken(depot.id, server.Host);
cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, server.Host)].Token;
}
c.AuthenticateDepot( depot.id, depot.depotKey, cdnAuthToken );
cdnClients.Enqueue( c );
c.AuthenticateDepot(depot.id, depot.depotKey, cdnAuthToken);
cdnClients.Add(c);
if (cdnClients.Count >= Config.MaxServers) parallelLoop.Stop();
}
catch
{
Console.WriteLine( "\nFailed to connect to content server {0}. Remaining content servers for depot: {1}.", s, Config.MaxServers - tries - 1 );
Console.WriteLine("\nFailed to connect to content server {0}", server);
}
tries++;
}
});
if ( cdnClients.Count == 0 )
{
Console.WriteLine( "\nUnable to find any content servers for depot {0} - {1}", depot.id, depot.contentName );
}
Config.MaxDownloads = Math.Min(Config.MaxDownloads, cdnClients.Count);
return cdnClients;
}
@ -536,7 +536,9 @@ namespace DepotDownloader
Console.Write("Finding content servers...");
var cdnClientsLock = new Object();
ConcurrentQueue<CDNClient> cdnClients = null;
BlockingCollection<CDNClient> cdnClients = null;
int liveCdnClients = 0;
CancellationTokenSource cts = new CancellationTokenSource();
Console.WriteLine(" Done!");
@ -582,6 +584,7 @@ namespace DepotDownloader
DepotManifest depotManifest = null;
cdnClients = CollectCDNClientsForDepot(depot);
liveCdnClients = cdnClients.Count;
foreach (var c in cdnClients)
{
@ -656,7 +659,7 @@ namespace DepotDownloader
var rand = new Random();
filesAfterExclusions.Where(f => !f.Flags.HasFlag(EDepotFileFlag.Directory))
.AsParallel().WithDegreeOfParallelism(Config.MaxDownloads)
.AsParallel().WithCancellation(cts.Token).WithDegreeOfParallelism(Config.MaxDownloads)
.ForAll(file =>
{
string fileFinalPath = Path.Combine(depot.installDir, file.FileName);
@ -775,35 +778,52 @@ namespace DepotDownloader
if (cdnClients == null)
{
cdnClients = CollectCDNClientsForDepot(depot);
liveCdnClients = cdnClients.Count;
}
}
}
foreach (var chunk in neededChunks)
{
string chunkID = Util.EncodeHexString(chunk.ChunkID);
if (cts.IsCancellationRequested) break;
string chunkID = Util.EncodeHexString(chunk.ChunkID);
CDNClient.DepotChunk chunkData = null;
CDNClient client = null;
while (cdnClients.TryDequeue(out client))
while (liveCdnClients > 0 && !cts.IsCancellationRequested)
{
DepotManifest.ChunkData data = new DepotManifest.ChunkData();
data.ChunkID = chunk.ChunkID;
data.Checksum = chunk.Checksum;
data.Offset = chunk.Offset;
data.CompressedLength = chunk.CompressedLength;
data.UncompressedLength = chunk.UncompressedLength;
CDNClient client;
try
{
client = cdnClients.Take(cts.Token);
}
catch (OperationCanceledException)
{
break;
}
DepotManifest.ChunkData data = new DepotManifest.ChunkData();
data.ChunkID = chunk.ChunkID;
data.Checksum = chunk.Checksum;
data.Offset = chunk.Offset;
data.CompressedLength = chunk.CompressedLength;
data.UncompressedLength = chunk.UncompressedLength;
try
{
chunkData = client.DownloadDepotChunk(depot.id, data);
cdnClients.Enqueue(client);
cdnClients.Add(client);
break;
}
catch
catch (Exception e)
{
Console.WriteLine("Encountered error downloading chunk {0}", chunkID);
Console.WriteLine("Encountered error downloading chunk {0}: {1} (live cdn clients: {2})", chunkID, e.Message, liveCdnClients);
int liveCount = Interlocked.Decrement(ref liveCdnClients);
if (liveCount == 0)
{
// we've run out of clients, tell other threads to abort
cts.Cancel();
}
}
}

Loading…
Cancel
Save