Updated to SteamKit 2.3.0

pull/85/head DepotDownloader_2.3.4
Ryan Kistner 6 years ago
parent 642f43864a
commit 7476cdb0a3

@ -3,36 +3,40 @@ using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace DepotDownloader namespace DepotDownloader
{ {
/// <summary> /// <summary>
/// CDNClientPool provides a pool of CDNClients to CDN endpoints /// CDNClientPool provides a pool of connections to CDN endpoints, requesting CDN tokens as needed
/// CDNClients that get re-used will be initialized for the correct depots
/// </summary> /// </summary>
class CDNClientPool class CDNClientPool
{ {
private const int ServerEndpointMinimumSize = 8; private const int ServerEndpointMinimumSize = 8;
private Steam3Session steamSession; private readonly Steam3Session steamSession;
private ConcurrentBag<CDNClient> activeClientPool; public CDNClient CDNClient { get; }
private ConcurrentDictionary<CDNClient, Tuple<uint, CDNClient.Server>> activeClientAuthed;
private BlockingCollection<CDNClient.Server> availableServerEndpoints;
private AutoResetEvent populatePoolEvent; private readonly ConcurrentDictionary<Tuple<uint, string>, string> depotCdnKeys;
private Task monitorTask;
private CancellationTokenSource shutdownToken; private readonly ConcurrentBag<CDNClient.Server> activeConnectionPool;
private readonly BlockingCollection<CDNClient.Server> availableServerEndpoints;
private readonly AutoResetEvent populatePoolEvent;
private readonly Task monitorTask;
private readonly CancellationTokenSource shutdownToken;
public CancellationTokenSource ExhaustedToken { get; set; } public CancellationTokenSource ExhaustedToken { get; set; }
public CDNClientPool(Steam3Session steamSession) public CDNClientPool(Steam3Session steamSession)
{ {
this.steamSession = steamSession; this.steamSession = steamSession;
CDNClient = new CDNClient(steamSession.steamClient);
depotCdnKeys = new ConcurrentDictionary<Tuple<uint, string>, string>();
activeClientPool = new ConcurrentBag<CDNClient>(); activeConnectionPool = new ConcurrentBag<CDNClient.Server>();
activeClientAuthed = new ConcurrentDictionary<CDNClient, Tuple<uint, CDNClient.Server>>();
availableServerEndpoints = new BlockingCollection<CDNClient.Server>(); availableServerEndpoints = new BlockingCollection<CDNClient.Server>();
populatePoolEvent = new AutoResetEvent(true); populatePoolEvent = new AutoResetEvent(true);
@ -44,10 +48,13 @@ namespace DepotDownloader
public void Shutdown() public void Shutdown()
{ {
shutdownToken.Cancel(); shutdownToken.Cancel();
monitorTask.Wait();
} }
private async Task<IReadOnlyCollection<CDNClient.Server>> FetchBootstrapServerListAsync() private async Task<IReadOnlyCollection<CDNClient.Server>> FetchBootstrapServerListAsync()
{ {
var backoffDelay = 0;
while (!shutdownToken.IsCancellationRequested) while (!shutdownToken.IsCancellationRequested)
{ {
try try
@ -61,6 +68,13 @@ namespace DepotDownloader
catch (Exception ex) catch (Exception ex)
{ {
Console.WriteLine("Failed to retrieve content server list: {0}", ex.Message); Console.WriteLine("Failed to retrieve content server list: {0}", ex.Message);
if (ex is SteamKitWebRequestException e && e.StatusCode == (HttpStatusCode)429)
{
// If we're being throttled, add a delay to the next request
backoffDelay = Math.Min(5, ++backoffDelay);
await Task.Delay(TimeSpan.FromSeconds(backoffDelay));
}
} }
} }
@ -76,8 +90,7 @@ namespace DepotDownloader
populatePoolEvent.WaitOne(TimeSpan.FromSeconds(1)); populatePoolEvent.WaitOne(TimeSpan.FromSeconds(1));
// We want the Steam session so we can take the CellID from the session and pass it through to the ContentServer Directory Service // We want the Steam session so we can take the CellID from the session and pass it through to the ContentServer Directory Service
if (availableServerEndpoints.Count < ServerEndpointMinimumSize && if (availableServerEndpoints.Count < ServerEndpointMinimumSize && steamSession.steamClient.IsConnected)
steamSession.steamClient.IsConnected)
{ {
var servers = await FetchBootstrapServerListAsync().ConfigureAwait(false); var servers = await FetchBootstrapServerListAsync().ConfigureAwait(false);
@ -89,22 +102,22 @@ namespace DepotDownloader
var weightedCdnServers = servers.Select(x => var weightedCdnServers = servers.Select(x =>
{ {
int penalty = 0; AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(x.Host, out var penalty);
AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(x.Host, out penalty);
return Tuple.Create(x, penalty); return Tuple.Create(x, penalty);
}).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad); }).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad);
foreach (var endpoint in weightedCdnServers) foreach (var (server, weight) in weightedCdnServers)
{ {
for (var i = 0; i < endpoint.Item1.NumEntries; i++) { for (var i = 0; i < server.NumEntries; i++)
availableServerEndpoints.Add(endpoint.Item1); {
availableServerEndpoints.Add(server);
} }
} }
didPopulate = true; didPopulate = true;
} }
else if ( availableServerEndpoints.Count == 0 && !steamSession.steamClient.IsConnected && didPopulate ) else if (availableServerEndpoints.Count == 0 && !steamSession.steamClient.IsConnected && didPopulate)
{ {
ExhaustedToken?.Cancel(); ExhaustedToken?.Cancel();
return; return;
@ -112,163 +125,65 @@ namespace DepotDownloader
} }
} }
private void ReleaseConnection(CDNClient client) private string AuthenticateConnection(uint appId, uint depotId, CDNClient.Server server)
{ {
Tuple<uint, CDNClient.Server> authData; steamSession.RequestCDNAuthToken(appId, depotId, server.Host);
activeClientAuthed.TryRemove(client, out authData);
}
private async Task<CDNClient> BuildConnectionAsync(uint appId, uint depotId, byte[] depotKey, CDNClient.Server serverSeed, CancellationToken token) var cdnKey = $"{depotId:D}:{steamSession.ResolveCDNTopLevelHost(server.Host)}";
{
CDNClient.Server server = null;
CDNClient client = null;
while (client == null) if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out var authTokenCallback))
{ {
// if we want to re-initialize a specific content server, try that one first return authTokenCallback.Token;
if (serverSeed != null) }
{ else
server = serverSeed; {
serverSeed = null; throw new Exception($"Failed to retrieve CDN token for server {server.Host} depot {depotId}");
}
else
{
if (availableServerEndpoints.Count < ServerEndpointMinimumSize)
{
populatePoolEvent.Set();
}
server = availableServerEndpoints.Take(token);
}
client = new CDNClient(steamSession.steamClient, steamSession.AppTickets[depotId]);
string cdnAuthToken = null;
try
{
if (server.Type == "CDN" || server.Type == "SteamCache")
{
steamSession.RequestCDNAuthToken(appId, depotId, server.Host);
var cdnKey = string.Format("{0:D}:{1}", depotId, steamSession.ResolveCDNTopLevelHost(server.Host));
SteamApps.CDNAuthTokenCallback authTokenCallback;
if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out authTokenCallback))
{
cdnAuthToken = authTokenCallback.Token;
}
else
{
throw new Exception(String.Format("Failed to retrieve CDN token for server {0} depot {1}", server.Host, depotId));
}
}
await client.ConnectAsync(server).ConfigureAwait(false);
await client.AuthenticateDepotAsync(depotId, depotKey, cdnAuthToken).ConfigureAwait(false);
}
catch (Exception ex)
{
client = null;
Console.WriteLine("Failed to connect to content server {0}: {1}", server, ex.Message);
int penalty = 0;
AccountSettingsStore.Instance.ContentServerPenalty.TryGetValue(server.Host, out penalty);
AccountSettingsStore.Instance.ContentServerPenalty[server.Host] = penalty + 1;
}
} }
Console.WriteLine("Initialized connection to content server {0} with depot id {1}", server, depotId);
activeClientAuthed[client] = Tuple.Create(depotId, server);
return client;
} }
private async Task<bool> ReauthConnectionAsync(CDNClient client, CDNClient.Server server, uint appId, uint depotId, byte[] depotKey) private CDNClient.Server BuildConnection(CancellationToken token)
{ {
DebugLog.Assert(server.Type == "CDN" || server.Type == "SteamCache" || steamSession.AppTickets[depotId] == null, "CDNClientPool", "Re-authing a CDN or anonymous connection"); if (availableServerEndpoints.Count < ServerEndpointMinimumSize)
String cdnAuthToken = null;
try
{ {
if (server.Type == "CDN" || server.Type == "SteamCache") populatePoolEvent.Set();
{
steamSession.RequestCDNAuthToken(appId, depotId, server.Host);
var cdnKey = string.Format("{0:D}:{1}", depotId, steamSession.ResolveCDNTopLevelHost(server.Host));
SteamApps.CDNAuthTokenCallback authTokenCallback;
if (steamSession.CDNAuthTokens.TryGetValue(cdnKey, out authTokenCallback))
{
cdnAuthToken = authTokenCallback.Token;
}
else
{
throw new Exception(String.Format("Failed to retrieve CDN token for server {0} depot {1}", server.Host, depotId));
}
}
await client.AuthenticateDepotAsync(depotId, depotKey, cdnAuthToken).ConfigureAwait(false);
activeClientAuthed[client] = Tuple.Create(depotId, server);
return true;
}
catch (Exception ex)
{
Console.WriteLine("Failed to reauth to content server {0}: {1}", server, ex.Message);
} }
return false; return availableServerEndpoints.Take(token);
} }
public async Task<CDNClient> GetConnectionForDepotAsync(uint appId, uint depotId, byte[] depotKey, CancellationToken token) public Tuple<CDNClient.Server, string> GetConnectionForDepot(uint appId, uint depotId, CancellationToken token)
{ {
CDNClient client = null; // Take a free connection from the connection pool
// If there were no free connections, create a new one from the server list
Tuple<uint, CDNClient.Server> authData; if (!activeConnectionPool.TryTake(out var server))
activeClientPool.TryTake(out client);
// if we couldn't find a connection, make one now
if (client == null)
{ {
client = await BuildConnectionAsync(appId, depotId, depotKey, null, token).ConfigureAwait(false); server = BuildConnection(token);
} }
// if we couldn't find the authorization data or it's not authed to this depotid, re-initialize var topLevelHost = steamSession.ResolveCDNTopLevelHost(server.Host);
if (!activeClientAuthed.TryGetValue(client, out authData) || authData.Item1 != depotId) var depotKey = Tuple.Create(depotId, topLevelHost);
// If we don't have a CDN token yet for this server and depot, fetch one now
if (!depotCdnKeys.TryGetValue(depotKey, out var cdnToken))
{ {
if ((authData.Item2.Type == "CDN" || authData.Item2.Type == "SteamCache") && await ReauthConnectionAsync(client, authData.Item2, appId, depotId, depotKey).ConfigureAwait(false)) depotCdnKeys[depotKey] = cdnToken = AuthenticateConnection(appId, depotId, server);
{
Console.WriteLine("Re-authed CDN connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId);
}
else if (authData.Item2.Type == "CS" && steamSession.AppTickets[depotId] == null && await ReauthConnectionAsync(client, authData.Item2, appId, depotId, depotKey).ConfigureAwait(false))
{
Console.WriteLine("Re-authed anonymous connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId);
}
else
{
ReleaseConnection(client);
client = await BuildConnectionAsync(appId, depotId, depotKey, authData.Item2, token).ConfigureAwait(false);
}
} }
return client; return Tuple.Create(server, cdnToken);
} }
public void ReturnConnection(CDNClient client) public void ReturnConnection(Tuple<CDNClient.Server, string> server)
{ {
if (client == null) return; if (server == null) return;
activeClientPool.Add(client); activeConnectionPool.Add(server.Item1);
} }
public void ReturnBrokenConnection(CDNClient client) public void ReturnBrokenConnection(Tuple<CDNClient.Server, string> server)
{ {
if (client == null) return; if (server == null) return;
ReleaseConnection(client); // Broken connections are not returned to the pool
} }
} }
} }

@ -662,40 +662,33 @@ namespace DepotDownloader
while ( depotManifest == null ) while ( depotManifest == null )
{ {
CDNClient client = null; Tuple<CDNClient.Server, string> connection = null;
try try
{ {
client = await cdnPool.GetConnectionForDepotAsync( appId, depot.id, depot.depotKey, CancellationToken.None ).ConfigureAwait( false ); connection = cdnPool.GetConnectionForDepot( appId, depot.id, CancellationToken.None );
depotManifest = await client.DownloadManifestAsync( depot.id, depot.manifestId ).ConfigureAwait( false ); depotManifest = await cdnPool.CDNClient.DownloadManifestAsync( depot.id, depot.manifestId,
connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait(false);
cdnPool.ReturnConnection( client ); cdnPool.ReturnConnection( connection );
} }
catch ( WebException e ) catch ( SteamKitWebRequestException e )
{ {
cdnPool.ReturnBrokenConnection( client ); cdnPool.ReturnBrokenConnection( connection );
if ( e.Status == WebExceptionStatus.ProtocolError ) if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden )
{ {
var response = e.Response as HttpWebResponse; Console.WriteLine( "Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId );
if ( response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden ) break;
{
Console.WriteLine( "Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId );
break;
}
else
{
Console.WriteLine( "Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, response.StatusCode );
}
} }
else else
{ {
Console.WriteLine( "Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Status ); Console.WriteLine( "Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, e.StatusCode );
} }
} }
catch ( Exception e ) catch ( Exception e )
{ {
cdnPool.ReturnBrokenConnection( client ); cdnPool.ReturnBrokenConnection( connection );
Console.WriteLine( "Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Message ); Console.WriteLine( "Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Message );
} }
} }
@ -716,7 +709,7 @@ namespace DepotDownloader
} }
} }
newProtoManifest.Files.Sort( ( x, y ) => { return x.FileName.CompareTo( y.FileName ); } ); newProtoManifest.Files.Sort( ( x, y ) => string.Compare( x.FileName, y.FileName, StringComparison.Ordinal ) );
if ( Config.DownloadManifestOnly ) if ( Config.DownloadManifestOnly )
{ {
@ -894,10 +887,10 @@ namespace DepotDownloader
while ( !cts.IsCancellationRequested ) while ( !cts.IsCancellationRequested )
{ {
CDNClient client; Tuple<CDNClient.Server, string> connection;
try try
{ {
client = await cdnPool.GetConnectionForDepotAsync( appId, depot.id, depot.depotKey, cts.Token ).ConfigureAwait( false ); connection = cdnPool.GetConnectionForDepot( appId, depot.id, cts.Token );
} }
catch ( OperationCanceledException ) catch ( OperationCanceledException )
{ {
@ -913,36 +906,29 @@ namespace DepotDownloader
try try
{ {
chunkData = await client.DownloadDepotChunkAsync( depot.id, data ).ConfigureAwait( false ); chunkData = await cdnPool.CDNClient.DownloadDepotChunkAsync( depot.id, data,
cdnPool.ReturnConnection( client ); connection.Item1, connection.Item2, depot.depotKey ).ConfigureAwait( false );
cdnPool.ReturnConnection( connection );
break; break;
} }
catch ( WebException e ) catch ( SteamKitWebRequestException e )
{ {
cdnPool.ReturnBrokenConnection( client ); cdnPool.ReturnBrokenConnection( connection );
if ( e.Status == WebExceptionStatus.ProtocolError ) if ( e.StatusCode == HttpStatusCode.Unauthorized || e.StatusCode == HttpStatusCode.Forbidden )
{ {
var response = e.Response as HttpWebResponse; Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID );
if ( response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden ) cts.Cancel();
{ break;
Console.WriteLine( "Encountered 401 for chunk {0}. Aborting.", chunkID );
cts.Cancel();
break;
}
else
{
Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, response.StatusCode );
}
} }
else else
{ {
Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.Status ); Console.WriteLine( "Encountered error downloading chunk {0}: {1}", chunkID, e.StatusCode );
} }
} }
catch ( Exception e ) catch ( Exception e )
{ {
cdnPool.ReturnBrokenConnection( client ); cdnPool.ReturnBrokenConnection( connection );
Console.WriteLine( "Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message ); Console.WriteLine( "Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message );
} }
} }

@ -6,6 +6,6 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="protobuf-net" Version="2.4.4" /> <PackageReference Include="protobuf-net" Version="2.4.4" />
<PackageReference Include="SteamKit2" Version="2.2.0" /> <PackageReference Include="SteamKit2" Version="2.3.0-Beta.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>
Loading…
Cancel
Save