From 7476cdb0a393e416de7318bffcd06f78bdfbcbd3 Mon Sep 17 00:00:00 2001 From: Ryan Kistner Date: Sat, 18 Apr 2020 12:09:39 -0600 Subject: [PATCH] Updated to SteamKit 2.3.0 --- DepotDownloader/CDNClientPool.cs | 215 ++++++++----------------- DepotDownloader/ContentDownloader.cs | 66 +++----- DepotDownloader/DepotDownloader.csproj | 2 +- 3 files changed, 92 insertions(+), 191 deletions(-) diff --git a/DepotDownloader/CDNClientPool.cs b/DepotDownloader/CDNClientPool.cs index 964a354e..6dc1c3e0 100644 --- a/DepotDownloader/CDNClientPool.cs +++ b/DepotDownloader/CDNClientPool.cs @@ -3,36 +3,40 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; namespace DepotDownloader { /// - /// CDNClientPool provides a pool of CDNClients to CDN endpoints - /// CDNClients that get re-used will be initialized for the correct depots + /// CDNClientPool provides a pool of connections to CDN endpoints, requesting CDN tokens as needed /// class CDNClientPool { private const int ServerEndpointMinimumSize = 8; - private Steam3Session steamSession; + private readonly Steam3Session steamSession; - private ConcurrentBag activeClientPool; - private ConcurrentDictionary> activeClientAuthed; - private BlockingCollection availableServerEndpoints; + public CDNClient CDNClient { get; } - private AutoResetEvent populatePoolEvent; - private Task monitorTask; - private CancellationTokenSource shutdownToken; + private readonly ConcurrentDictionary, string> depotCdnKeys; + + private readonly ConcurrentBag activeConnectionPool; + private readonly BlockingCollection availableServerEndpoints; + + private readonly AutoResetEvent populatePoolEvent; + private readonly Task monitorTask; + private readonly CancellationTokenSource shutdownToken; public CancellationTokenSource ExhaustedToken { get; set; } public CDNClientPool(Steam3Session steamSession) { this.steamSession = steamSession; + CDNClient = new CDNClient(steamSession.steamClient); + depotCdnKeys = new ConcurrentDictionary, string>(); - activeClientPool = new ConcurrentBag(); - activeClientAuthed = new ConcurrentDictionary>(); + activeConnectionPool = new ConcurrentBag(); availableServerEndpoints = new BlockingCollection(); populatePoolEvent = new AutoResetEvent(true); @@ -44,10 +48,13 @@ namespace DepotDownloader public void Shutdown() { shutdownToken.Cancel(); + monitorTask.Wait(); } private async Task> FetchBootstrapServerListAsync() { + var backoffDelay = 0; + while (!shutdownToken.IsCancellationRequested) { try @@ -61,6 +68,13 @@ namespace DepotDownloader catch (Exception ex) { Console.WriteLine("Failed to retrieve content server list: {0}", ex.Message); + + if (ex is SteamKitWebRequestException e && e.StatusCode == (HttpStatusCode)429) + { + // If we're being throttled, add a delay to the next request + backoffDelay = Math.Min(5, ++backoffDelay); + await Task.Delay(TimeSpan.FromSeconds(backoffDelay)); + } } } @@ -76,8 +90,7 @@ namespace DepotDownloader populatePoolEvent.WaitOne(TimeSpan.FromSeconds(1)); // We want the Steam session so we can take the CellID from the session and pass it through to the ContentServer Directory Service - if (availableServerEndpoints.Count < ServerEndpointMinimumSize && - steamSession.steamClient.IsConnected) + if (availableServerEndpoints.Count < ServerEndpointMinimumSize && steamSession.steamClient.IsConnected) { var servers = await FetchBootstrapServerListAsync().ConfigureAwait(false); @@ -89,22 +102,22 @@ namespace DepotDownloader var weightedCdnServers = servers.Select(x => { - int penalty = 0; - AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(x.Host, out penalty); + AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(x.Host, out var penalty); return Tuple.Create(x, penalty); }).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad); - foreach (var endpoint in weightedCdnServers) + foreach (var (server, weight) in weightedCdnServers) { - for (var i = 0; i < endpoint.Item1.NumEntries; i++) { - availableServerEndpoints.Add(endpoint.Item1); + for (var i = 0; i < server.NumEntries; i++) + { + availableServerEndpoints.Add(server); } } didPopulate = true; - } - else if ( availableServerEndpoints.Count == 0 && !steamSession.steamClient.IsConnected && didPopulate ) + } + else if (availableServerEndpoints.Count == 0 && !steamSession.steamClient.IsConnected && didPopulate) { ExhaustedToken?.Cancel(); return; @@ -112,163 +125,65 @@ namespace DepotDownloader } } - private void ReleaseConnection(CDNClient client) + private string AuthenticateConnection(uint appId, uint depotId, CDNClient.Server server) { - Tuple authData; - activeClientAuthed.TryRemove(client, out authData); - } + steamSession.RequestCDNAuthToken(appId, depotId, server.Host); - private async Task BuildConnectionAsync(uint appId, uint depotId, byte[] depotKey, CDNClient.Server serverSeed, CancellationToken token) - { - CDNClient.Server server = null; - CDNClient client = null; + var cdnKey = $"{depotId:D}:{steamSession.ResolveCDNTopLevelHost(server.Host)}"; - while (client == null) + if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out var authTokenCallback)) { - // if we want to re-initialize a specific content server, try that one first - if (serverSeed != null) - { - server = serverSeed; - serverSeed = null; - } - else - { - if (availableServerEndpoints.Count < ServerEndpointMinimumSize) - { - populatePoolEvent.Set(); - } - - server = availableServerEndpoints.Take(token); - } - - client = new CDNClient(steamSession.steamClient, steamSession.AppTickets[depotId]); - - string cdnAuthToken = null; - - try - { - if (server.Type == "CDN" || server.Type == "SteamCache") - { - steamSession.RequestCDNAuthToken(appId, depotId, server.Host); - - var cdnKey = string.Format("{0:D}:{1}", depotId, steamSession.ResolveCDNTopLevelHost(server.Host)); - SteamApps.CDNAuthTokenCallback authTokenCallback; - - if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out authTokenCallback)) - { - cdnAuthToken = authTokenCallback.Token; - } - else - { - throw new Exception(String.Format("Failed to retrieve CDN token for server {0} depot {1}", server.Host, depotId)); - } - } - - await client.ConnectAsync(server).ConfigureAwait(false); - await client.AuthenticateDepotAsync(depotId, depotKey, cdnAuthToken).ConfigureAwait(false); - } - catch (Exception ex) - { - client = null; - - Console.WriteLine("Failed to connect to content server {0}: {1}", server, ex.Message); - - int penalty = 0; - AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(server.Host, out penalty); - AccountSettingsStore.Instance.ContentServerPenalty[server.Host] = penalty + 1; - } + return authTokenCallback.Token; + } + else + { + throw new Exception($"Failed to retrieve CDN token for server {server.Host} depot {depotId}"); } - - Console.WriteLine("Initialized connection to content server {0} with depot id {1}", server, depotId); - - activeClientAuthed[client] = Tuple.Create(depotId, server); - return client; } - private async Task ReauthConnectionAsync(CDNClient client, CDNClient.Server server, uint appId, uint depotId, byte[] depotKey) + private CDNClient.Server BuildConnection(CancellationToken token) { - DebugLog.Assert(server.Type == "CDN" || server.Type == "SteamCache" || steamSession.AppTickets[depotId] == null, "CDNClientPool", "Re-authing a CDN or anonymous connection"); - - String cdnAuthToken = null; - - try - { - if (server.Type == "CDN" || server.Type == "SteamCache") - { - steamSession.RequestCDNAuthToken(appId, depotId, server.Host); - - var cdnKey = string.Format("{0:D}:{1}", depotId, steamSession.ResolveCDNTopLevelHost(server.Host)); - SteamApps.CDNAuthTokenCallback authTokenCallback; - - if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out authTokenCallback)) - { - cdnAuthToken = authTokenCallback.Token; - } - else - { - throw new Exception(String.Format("Failed to retrieve CDN token for server {0} depot {1}", server.Host, depotId)); - } - } - - await client.AuthenticateDepotAsync(depotId, depotKey, cdnAuthToken).ConfigureAwait(false); - activeClientAuthed[client] = Tuple.Create(depotId, server); - return true; - } - catch (Exception ex) + if (availableServerEndpoints.Count < ServerEndpointMinimumSize) { - Console.WriteLine("Failed to reauth to content server {0}: {1}", server, ex.Message); + populatePoolEvent.Set(); } - return false; + return availableServerEndpoints.Take(token); } - public async Task GetConnectionForDepotAsync(uint appId, uint depotId, byte[] depotKey, CancellationToken token) + public Tuple GetConnectionForDepot(uint appId, uint depotId, CancellationToken token) { - CDNClient client = null; - - Tuple authData; - - activeClientPool.TryTake(out client); - - // if we couldn't find a connection, make one now - if (client == null) + // Take a free connection from the connection pool + // If there were no free connections, create a new one from the server list + if (!activeConnectionPool.TryTake(out var server)) { - client = await BuildConnectionAsync(appId, depotId, depotKey, null, token).ConfigureAwait(false); + server = BuildConnection(token); } - // if we couldn't find the authorization data or it's not authed to this depotid, re-initialize - if (!activeClientAuthed.TryGetValue(client, out authData) || authData.Item1 != depotId) + var topLevelHost = steamSession.ResolveCDNTopLevelHost(server.Host); + var depotKey = Tuple.Create(depotId, topLevelHost); + + // If we don't have a CDN token yet for this server and depot, fetch one now + if (!depotCdnKeys.TryGetValue(depotKey, out var cdnToken)) { - if ((authData.Item2.Type == "CDN" || authData.Item2.Type == "SteamCache") && await ReauthConnectionAsync(client, authData.Item2, appId, depotId, depotKey).ConfigureAwait(false)) - { - Console.WriteLine("Re-authed CDN connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId); - } - else if (authData.Item2.Type == "CS" && steamSession.AppTickets[depotId] == null && await ReauthConnectionAsync(client, authData.Item2, appId, depotId, depotKey).ConfigureAwait(false)) - { - Console.WriteLine("Re-authed anonymous connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId); - } - else - { - ReleaseConnection(client); - client = await BuildConnectionAsync(appId, depotId, depotKey, authData.Item2, token).ConfigureAwait(false); - } + depotCdnKeys[depotKey] = cdnToken = AuthenticateConnection(appId, depotId, server); } - return client; + return Tuple.Create(server, cdnToken); } - - public void ReturnConnection(CDNClient client) + + public void ReturnConnection(Tuple server) { - if (client == null) return; + if (server == null) return; - activeClientPool.Add(client); + activeConnectionPool.Add(server.Item1); } - public void ReturnBrokenConnection(CDNClient client) + public void ReturnBrokenConnection(Tuple server) { - if (client == null) return; + if (server == null) return; - ReleaseConnection(client); + // Broken connections are not returned to the pool } } } diff --git a/DepotDownloader/ContentDownloader.cs b/DepotDownloader/ContentDownloader.cs index 6053b032..a09877d3 100644 --- a/DepotDownloader/ContentDownloader.cs +++ b/DepotDownloader/ContentDownloader.cs @@ -662,40 +662,33 @@ namespace DepotDownloader while ( depotManifest == null ) { - CDNClient client = null; + Tuple connection = null; try { - client = await cdnPool.GetConnectionForDepotAsync( appId, depot.id, depot.depotKey, CancellationToken.None ).ConfigureAwait( false ); + connection = cdnPool.GetConnectionForDepot( appId, depot.id, CancellationToken.None ); - depotManifest = await client.DownloadManifestAsync( depot.id, depot.manifestId ).ConfigureAwait( false ); + depotManifest = await cdnPool.CDNClient.DownloadManifestAsync( depot.id, depot.manifestId, + connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait(false); - cdnPool.ReturnConnection( client ); + cdnPool.ReturnConnection( connection ); } - catch ( WebException e ) + catch ( SteamKitWebRequestException e ) { - cdnPool.ReturnBrokenConnection( client ); + cdnPool.ReturnBrokenConnection( connection ); - if ( e.Status == WebExceptionStatus.ProtocolError ) + if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden ) { - var response = e.Response as HttpWebResponse; - if ( response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden ) - { - Console.WriteLine( "Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId ); - break; - } - else - { - Console.WriteLine( "Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, response.StatusCode ); - } + Console.WriteLine( "Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId ); + break; } else { - Console.WriteLine( "Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Status ); + Console.WriteLine( "Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, e.StatusCode ); } } catch ( Exception e ) { - cdnPool.ReturnBrokenConnection( client ); + cdnPool.ReturnBrokenConnection( connection ); Console.WriteLine( "Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Message ); } } @@ -716,7 +709,7 @@ namespace DepotDownloader } } - newProtoManifest.Files.Sort( ( x, y ) => { return x.FileName.CompareTo( y.FileName ); } ); + newProtoManifest.Files.Sort( ( x, y ) => string.Compare( x.FileName, y.FileName, StringComparison.Ordinal ) ); if ( Config.DownloadManifestOnly ) { @@ -894,10 +887,10 @@ namespace DepotDownloader while ( !cts.IsCancellationRequested ) { - CDNClient client; + Tuple connection; try { - client = await cdnPool.GetConnectionForDepotAsync( appId, depot.id, depot.depotKey, cts.Token ).ConfigureAwait( false ); + connection = cdnPool.GetConnectionForDepot( appId, depot.id, cts.Token ); } catch ( OperationCanceledException ) { @@ -913,36 +906,29 @@ namespace DepotDownloader try { - chunkData = await client.DownloadDepotChunkAsync( depot.id, data ).ConfigureAwait( false ); - cdnPool.ReturnConnection( client ); + chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync( depot.id, data, + connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait( false ); + cdnPool.ReturnConnection( connection ); break; } - catch ( WebException e ) + catch ( SteamKitWebRequestException e ) { - cdnPool.ReturnBrokenConnection( client ); + cdnPool.ReturnBrokenConnection( connection ); - if ( e.Status == WebExceptionStatus.ProtocolError ) + if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden ) { - var response = e.Response as HttpWebResponse; - if ( response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden ) - { - Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID ); - cts.Cancel(); - break; - } - else - { - Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, response.StatusCode ); - } + Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID ); + cts.Cancel(); + break; } else { - Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.Status ); + Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode ); } } catch ( Exception e ) { - cdnPool.ReturnBrokenConnection( client ); + cdnPool.ReturnBrokenConnection( connection ); Console.WriteLine( "Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message ); } } diff --git a/DepotDownloader/DepotDownloader.csproj b/DepotDownloader/DepotDownloader.csproj index 11400aee..cb9395df 100644 --- a/DepotDownloader/DepotDownloader.csproj +++ b/DepotDownloader/DepotDownloader.csproj @@ -6,6 +6,6 @@ - + \ No newline at end of file