diff --git a/DepotDownloader/CDNClientPool.cs b/DepotDownloader/CDNClientPool.cs new file mode 100644 index 00000000..843118ee --- /dev/null +++ b/DepotDownloader/CDNClientPool.cs @@ -0,0 +1,235 @@ +using SteamKit2; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DepotDownloader +{ + /// + /// CDNClientPool provides a pool of CDNClients to CDN endpoints + /// CDNClients that get re-used will be initialized for the correct depots + /// + class CDNClientPool + { + private static int ServerEndpointMinimumSize = 8; + + private Steam3Session steamSession; + + private ConcurrentBag activeClientPool; + private ConcurrentDictionary> activeClientAuthed; + private BlockingCollection availableServerEndpoints; + + private AutoResetEvent populatePoolEvent; + private Thread monitorThread; + + public CDNClientPool(Steam3Session steamSession) + { + this.steamSession = steamSession; + + activeClientPool = new ConcurrentBag(); + activeClientAuthed = new ConcurrentDictionary>(); + availableServerEndpoints = new BlockingCollection(); + + populatePoolEvent = new AutoResetEvent(true); + + monitorThread = new Thread(ConnectionPoolMonitor); + monitorThread.Name = "CDNClient Pool Monitor"; + monitorThread.IsBackground = true; + monitorThread.Start(); + } + + private List fetchBootstrapServerList() + { + CDNClient bootstrap = new CDNClient(steamSession.steamClient); + + while (true) + { + try + { + var cdnServers = bootstrap.FetchServerList(cellId: (uint)ContentDownloader.Config.CellID); + if (cdnServers != null) + { + return cdnServers; + } + } + catch (Exception ex) + { + Console.WriteLine("Failed to retrieve content server list: {0}", ex.Message); + } + } + } + + private void ConnectionPoolMonitor() + { + while(true) + { + populatePoolEvent.WaitOne(TimeSpan.FromSeconds(1)); + + // peek ahead into steam session to see if we have servers + if (availableServerEndpoints.Count < ServerEndpointMinimumSize && + steamSession.steamClient.IsConnected && + steamSession.steamClient.GetServersOfType(EServerType.CS).Count > 0) + { + var servers = fetchBootstrapServerList(); + + var weightedCdnServers = servers.Select(x => + { + int penalty = 0; + ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(x.Host, out penalty); + + return Tuple.Create(x, penalty); + }).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad); + + foreach (var endpoint in weightedCdnServers) + { + for (var i = 0; i < endpoint.Item1.NumEntries; i++) { + availableServerEndpoints.Add(endpoint.Item1); + } + } + } + } + } + + private void releaseConnection(CDNClient client) + { + Tuple authData; + activeClientAuthed.TryRemove(client, out authData); + } + + private CDNClient buildConnection(uint depotId, byte[] depotKey, CDNClient.Server serverSeed, CancellationToken token) + { + CDNClient.Server server = null; + CDNClient client = null; + + while (client == null) + { + // if we want to re-initialize a specific content server, try that one first + if (serverSeed != null) + { + server = serverSeed; + serverSeed = null; + } + else + { + if (availableServerEndpoints.Count < ServerEndpointMinimumSize) + { + populatePoolEvent.Set(); + } + + server = availableServerEndpoints.Take(token); + } + + client = new CDNClient(steamSession.steamClient, steamSession.AppTickets[depotId]); + + string cdnAuthToken = null; + + if (server.Type == "CDN") + { + steamSession.RequestCDNAuthToken(depotId, server.Host); + cdnAuthToken = steamSession.CDNAuthTokens[Tuple.Create(depotId, server.Host)].Token; + } + + try + { + client.Connect(server); + client.AuthenticateDepot(depotId, depotKey, cdnAuthToken); + } + catch (Exception ex) + { + client = null; + + Console.WriteLine("Failed to connect to content server {0}: {1}", server, ex.Message); + + int penalty = 0; + ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(server.Host, out penalty); + ConfigStore.TheConfig.ContentServerPenalty[server.Host] = penalty + 1; + } + } + + Console.WriteLine("Initialized connection to content server {0} with depot id {1}", server, depotId); + + activeClientAuthed[client] = Tuple.Create(depotId, server); + return client; + } + + private bool reauthConnection(CDNClient client, CDNClient.Server server, uint depotId, byte[] depotKey) + { + DebugLog.Assert(server.Type == "CDN" || steamSession.AppTickets[depotId] == null, "CDNClientPool", "Re-authing a CDN or anonymous connection"); + + String cdnAuthToken = null; + + if (server.Type == "CDN") + { + steamSession.RequestCDNAuthToken(depotId, server.Host); + cdnAuthToken = steamSession.CDNAuthTokens[Tuple.Create(depotId, server.Host)].Token; + } + + try + { + client.AuthenticateDepot(depotId, depotKey, cdnAuthToken); + activeClientAuthed[client] = Tuple.Create(depotId, server); + return true; + } + catch (Exception ex) + { + Console.WriteLine("Failed to reauth to content server {0}: {1}", server, ex.Message); + } + + return false; + } + + public CDNClient getConnectionForDepot(uint depotId, byte[] depotKey, CancellationToken token) + { + CDNClient client = null; + + Tuple authData; + + activeClientPool.TryTake(out client); + + // if we couldn't find a connection, make one now + if (client == null) + { + client = buildConnection(depotId, depotKey, null, token); + } + + // if we couldn't find the authorization data or it's not authed to this depotid, re-initialize + if (!activeClientAuthed.TryGetValue(client, out authData) || authData.Item1 != depotId) + { + if (authData.Item2.Type == "CDN" && reauthConnection(client, authData.Item2, depotId, depotKey)) + { + Console.WriteLine("Re-authed CDN connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId); + } + else if (authData.Item2.Type == "CS" && steamSession.AppTickets[depotId] == null && reauthConnection(client, authData.Item2, depotId, depotKey)) + { + Console.WriteLine("Re-authed anonymous connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId); + } + else + { + releaseConnection(client); + client = buildConnection(depotId, depotKey, authData.Item2, token); + } + } + + return client; + } + + public void returnConnection(CDNClient client) + { + if (client == null) return; + + activeClientPool.Add(client); + } + + public void returnBrokenConnection(CDNClient client) + { + if (client == null) return; + + releaseConnection(client); + } + } +} diff --git a/DepotDownloader/ContentDownloader.cs b/DepotDownloader/ContentDownloader.cs index dd1d914e..2005372f 100644 --- a/DepotDownloader/ContentDownloader.cs +++ b/DepotDownloader/ContentDownloader.cs @@ -23,6 +23,7 @@ namespace DepotDownloader private static Steam3Session steam3; private static Steam3Session.Credentials steam3Credentials; + private static CDNClientPool cdnPool; private const string DEFAULT_DOWNLOAD_DIR = "depots"; private const string CONFIG_DIR = ".DepotDownloader"; @@ -309,6 +310,8 @@ namespace DepotDownloader Console.WriteLine("Unable to get steam3 credentials."); return; } + + cdnPool = new CDNClientPool(steam3); } public static void ShutdownSteam3() @@ -465,87 +468,6 @@ namespace DepotDownloader public ProtoManifest.ChunkData NewChunk { get; private set; } } - private static BlockingCollection CollectCDNClientsForDepot(DepotDownloadInfo depot) - { - Console.WriteLine("Finding content servers..."); - - var cdnClients = new BlockingCollection(); - CDNClient initialClient = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] ); - List cdnServers = null; - - while(true) - { - try - { - cdnServers = initialClient.FetchServerList( cellId: (uint)Config.CellID ); - if (cdnServers != null) break; - } - catch (WebException) - { - Console.WriteLine("\nFailed to retrieve content server list."); - Thread.Sleep(500); - } - } - - if (cdnServers == null) - { - Console.WriteLine("\nUnable to query any content servers for depot {0} - {1}", depot.id, depot.contentName); - return cdnClients; - } - - var weightedCdnServers = cdnServers.Select(x => - { - int penalty = 0; - ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(x.Host, out penalty); - - return Tuple.Create(x, penalty); - }).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad); - - // Grab up to the first eight server in the allegedly best-to-worst order from Steam - Parallel.ForEach(weightedCdnServers, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (serverTuple, parallelLoop) => - { - var server = serverTuple.Item1; - CDNClient c = new CDNClient( steam3.steamClient, steam3.AppTickets[ depot.id ] ); - - try - { - for (int i = 0; i < server.NumEntries; i++) - { - c.Connect(server); - - string cdnAuthToken = null; - if (server.Type == "CDN") - { - steam3.RequestCDNAuthToken(depot.id, server.Host); - cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, server.Host)].Token; - } - - c.AuthenticateDepot(depot.id, depot.depotKey, cdnAuthToken); - cdnClients.Add(c); - } - - if (cdnClients.Count >= Config.MaxServers) parallelLoop.Stop(); - } - catch (Exception ex) - { - int penalty = 0; - ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(server.Host, out penalty); - ConfigStore.TheConfig.ContentServerPenalty[server.Host] = penalty + 1; - - Console.WriteLine("\nFailed to connect to content server {0}: {1}", server, ex.Message); - } - }); - - if (cdnClients.Count == 0) - { - Console.WriteLine("\nUnable to find any content servers for depot {0} - {1}", depot.id, depot.contentName); - } - - Config.MaxDownloads = Math.Min(Config.MaxDownloads, cdnClients.Count); - - return cdnClients; - } - private static void DownloadSteam3( List depots ) { ulong TotalBytesCompressed = 0; @@ -558,8 +480,6 @@ namespace DepotDownloader Console.WriteLine("Downloading depot {0} - {1}", depot.id, depot.contentName); - var cdnClientsLock = new Object(); - BlockingCollection cdnClients = null; CancellationTokenSource cts = new CancellationTokenSource(); ProtoManifest oldProtoManifest = null; @@ -603,17 +523,43 @@ namespace DepotDownloader DepotManifest depotManifest = null; - cdnClients = CollectCDNClientsForDepot(depot); - - foreach (var c in cdnClients) + while (depotManifest == null) { - try + CDNClient client = null; + try { + client = cdnPool.getConnectionForDepot(depot.id, depot.depotKey, CancellationToken.None); + + depotManifest = client.DownloadManifest(depot.id, depot.manifestId); + + cdnPool.returnConnection(client); + } + catch (WebException e) { - depotManifest = c.DownloadManifest(depot.id, depot.manifestId); - break; + cdnPool.returnBrokenConnection(client); + + if (e.Status == WebExceptionStatus.ProtocolError) + { + var response = e.Response as HttpWebResponse; + if (response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden) + { + Console.WriteLine("Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId); + break; + } + else + { + Console.WriteLine("Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, response.StatusCode); + } + } + else + { + Console.WriteLine("Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Status); + } + } + catch (Exception e) + { + cdnPool.returnBrokenConnection(client); + Console.WriteLine("Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Message); } - catch (WebException) { } - catch (SocketException) { } } if (depotManifest == null) @@ -787,18 +733,6 @@ namespace DepotDownloader } } - if (neededChunks.Count > 0 && cdnClients == null) - { - // If we didn't need to connect to get manifests, connect now. - lock (cdnClientsLock) - { - if (cdnClients == null) - { - cdnClients = CollectCDNClientsForDepot(depot); - } - } - } - foreach (var chunk in neededChunks) { if (cts.IsCancellationRequested) break; @@ -811,7 +745,7 @@ namespace DepotDownloader CDNClient client; try { - client = cdnClients.Take(cts.Token); + client = cdnPool.getConnectionForDepot(depot.id, depot.depotKey, cts.Token); } catch (OperationCanceledException) { @@ -828,10 +762,13 @@ namespace DepotDownloader try { chunkData = client.DownloadDepotChunk(depot.id, data); + cdnPool.returnConnection(client); break; } catch (WebException e) { + cdnPool.returnBrokenConnection(client); + if (e.Status == WebExceptionStatus.ProtocolError) { var response = e.Response as HttpWebResponse; @@ -850,18 +787,12 @@ namespace DepotDownloader { Console.WriteLine("Encountered error downloading chunk {0}: {1}", chunkID, e.Status); } - - // let client "cool off" before re-adding it to the queue - Thread.Sleep(500); } catch (Exception e) { + cdnPool.returnBrokenConnection(client); Console.WriteLine("Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message); } - finally - { - cdnClients.Add(client); - } } if (chunkData == null) diff --git a/DepotDownloader/DepotDownloader.csproj b/DepotDownloader/DepotDownloader.csproj index 4feb893f..3857acc6 100644 --- a/DepotDownloader/DepotDownloader.csproj +++ b/DepotDownloader/DepotDownloader.csproj @@ -80,6 +80,7 @@ + diff --git a/DepotDownloader/Steam3Session.cs b/DepotDownloader/Steam3Session.cs index 504a63df..575ba89c 100644 --- a/DepotDownloader/Steam3Session.cs +++ b/DepotDownloader/Steam3Session.cs @@ -292,7 +292,7 @@ namespace DepotDownloader Action cbMethod = (cdnAuth) => { completed = true; - Console.WriteLine("Got CDN auth token for {0} result: {1}", host, cdnAuth.Result); + Console.WriteLine("Got CDN auth token for {0} result: {1} (expires {2})", host, cdnAuth.Result, cdnAuth.Expiration); if (cdnAuth.Result != EResult.OK) {