@ -3,36 +3,40 @@ using System;
using System.Collections.Concurrent ;
using System.Collections.Generic ;
using System.Linq ;
using System.Net ;
using System.Threading ;
using System.Threading.Tasks ;
namespace DepotDownloader
{
/// <summary>
/// CDNClientPool provides a pool of CDNClients to CDN endpoints
/// CDNClients that get re-used will be initialized for the correct depots
/// CDNClientPool provides a pool of connections to CDN endpoints, requesting CDN tokens as needed
/// </summary>
class CDNClientPool
{
private const int ServerEndpointMinimumSize = 8 ;
private Steam3Session steamSession ;
private readonly Steam3Session steamSession ;
private ConcurrentBag < CDNClient > activeClientPool ;
private ConcurrentDictionary < CDNClient , Tuple < uint , CDNClient . Server > > activeClientAuthed ;
private BlockingCollection < CDNClient . Server > availableServerEndpoints ;
public CDNClient CDNClient { get ; }
private AutoResetEvent populatePoolEvent ;
private Task monitorTask ;
private CancellationTokenSource shutdownToken ;
private readonly ConcurrentDictionary < Tuple < uint , string > , string > depotCdnKeys ;
private readonly ConcurrentBag < CDNClient . Server > activeConnectionPool ;
private readonly BlockingCollection < CDNClient . Server > availableServerEndpoints ;
private readonly AutoResetEvent populatePoolEvent ;
private readonly Task monitorTask ;
private readonly CancellationTokenSource shutdownToken ;
public CancellationTokenSource ExhaustedToken { get ; set ; }
public CDNClientPool ( Steam3Session steamSession )
{
this . steamSession = steamSession ;
CDNClient = new CDNClient ( steamSession . steamClient ) ;
depotCdnKeys = new ConcurrentDictionary < Tuple < uint , string > , string > ( ) ;
activeClientPool = new ConcurrentBag < CDNClient > ( ) ;
activeClientAuthed = new ConcurrentDictionary < CDNClient , Tuple < uint , CDNClient . Server > > ( ) ;
activeConnectionPool = new ConcurrentBag < CDNClient . Server > ( ) ;
availableServerEndpoints = new BlockingCollection < CDNClient . Server > ( ) ;
populatePoolEvent = new AutoResetEvent ( true ) ;
@ -44,10 +48,13 @@ namespace DepotDownloader
public void Shutdown ( )
{
shutdownToken . Cancel ( ) ;
monitorTask . Wait ( ) ;
}
private async Task < IReadOnlyCollection < CDNClient . Server > > FetchBootstrapServerListAsync ( )
{
var backoffDelay = 0 ;
while ( ! shutdownToken . IsCancellationRequested )
{
try
@ -61,6 +68,13 @@ namespace DepotDownloader
catch ( Exception ex )
{
Console . WriteLine ( "Failed to retrieve content server list: {0}" , ex . Message ) ;
if ( ex is SteamKitWebRequestException e & & e . StatusCode = = ( HttpStatusCode ) 429 )
{
// If we're being throttled, add a delay to the next request
backoffDelay = Math . Min ( 5 , + + backoffDelay ) ;
await Task . Delay ( TimeSpan . FromSeconds ( backoffDelay ) ) ;
}
}
}
@ -76,8 +90,7 @@ namespace DepotDownloader
populatePoolEvent . WaitOne ( TimeSpan . FromSeconds ( 1 ) ) ;
// We want the Steam session so we can take the CellID from the session and pass it through to the ContentServer Directory Service
if ( availableServerEndpoints . Count < ServerEndpointMinimumSize & &
steamSession . steamClient . IsConnected )
if ( availableServerEndpoints . Count < ServerEndpointMinimumSize & & steamSession . steamClient . IsConnected )
{
var servers = await FetchBootstrapServerListAsync ( ) . ConfigureAwait ( false ) ;
@ -89,22 +102,22 @@ namespace DepotDownloader
var weightedCdnServers = servers . Select ( x = >
{
int penalty = 0 ;
AccountSettingsStore . Instance . ContentServerPenalty . TryGetValue ( x . Host , out penalty ) ;
AccountSettingsStore . Instance . ContentServerPenalty . TryGetValue ( x . Host , out var penalty ) ;
return Tuple . Create ( x , penalty ) ;
} ) . OrderBy ( x = > x . Item2 ) . ThenBy ( x = > x . Item1 . WeightedLoad ) ;
foreach ( var endpoint in weightedCdnServers )
foreach ( var ( server , weight ) in weightedCdnServers )
{
for ( var i = 0 ; i < endpoint . Item1 . NumEntries ; i + + ) {
availableServerEndpoints . Add ( endpoint . Item1 ) ;
for ( var i = 0 ; i < server . NumEntries ; i + + )
{
availableServerEndpoints . Add ( server ) ;
}
}
didPopulate = true ;
}
else if ( availableServerEndpoints . Count = = 0 & & ! steamSession . steamClient . IsConnected & & didPopulate )
}
else if ( availableServerEndpoints . Count = = 0 & & ! steamSession . steamClient . IsConnected & & didPopulate )
{
ExhaustedToken ? . Cancel ( ) ;
return ;
@ -112,163 +125,65 @@ namespace DepotDownloader
}
}
private void ReleaseConnection ( CDNClient client )
private string AuthenticateConnection ( uint appId , uint depotId , CDNClient . Server server )
{
Tuple < uint , CDNClient . Server > authData ;
activeClientAuthed . TryRemove ( client , out authData ) ;
}
steamSession . RequestCDNAuthToken ( appId , depotId , server . Host ) ;
private async Task < CDNClient > BuildConnectionAsync ( uint appId , uint depotId , byte [ ] depotKey , CDNClient . Server serverSeed , CancellationToken token )
{
CDNClient . Server server = null ;
CDNClient client = null ;
var cdnKey = $"{depotId:D}:{steamSession.ResolveCDNTopLevelHost(server.Host)}" ;
while ( client = = null )
if ( steamSession . CDNAuthTokens . TryGetValue ( cdnKey , out var authTokenCallback ) )
{
// if we want to re-initialize a specific content server, try that one first
if ( serverSeed ! = null )
{
server = serverSeed ;
serverSeed = null ;
}
else
{
if ( availableServerEndpoints . Count < ServerEndpointMinimumSize )
{
populatePoolEvent . Set ( ) ;
}
server = availableServerEndpoints . Take ( token ) ;
}
client = new CDNClient ( steamSession . steamClient , steamSession . AppTickets [ depotId ] ) ;
string cdnAuthToken = null ;
try
{
if ( server . Type = = "CDN" | | server . Type = = "SteamCache" )
{
steamSession . RequestCDNAuthToken ( appId , depotId , server . Host ) ;
var cdnKey = string . Format ( "{0:D}:{1}" , depotId , steamSession . ResolveCDNTopLevelHost ( server . Host ) ) ;
SteamApps . CDNAuthTokenCallback authTokenCallback ;
if ( steamSession . CDNAuthTokens . TryGetValue ( cdnKey , out authTokenCallback ) )
{
cdnAuthToken = authTokenCallback . Token ;
}
else
{
throw new Exception ( String . Format ( "Failed to retrieve CDN token for server {0} depot {1}" , server . Host , depotId ) ) ;
}
}
await client . ConnectAsync ( server ) . ConfigureAwait ( false ) ;
await client . AuthenticateDepotAsync ( depotId , depotKey , cdnAuthToken ) . ConfigureAwait ( false ) ;
}
catch ( Exception ex )
{
client = null ;
Console . WriteLine ( "Failed to connect to content server {0}: {1}" , server , ex . Message ) ;
int penalty = 0 ;
AccountSettingsStore . Instance . ContentServerPenalty . TryGetValue ( server . Host , out penalty ) ;
AccountSettingsStore . Instance . ContentServerPenalty [ server . Host ] = penalty + 1 ;
}
return authTokenCallback . Token ;
}
else
{
throw new Exception ( $"Failed to retrieve CDN token for server {server.Host} depot {depotId}" ) ;
}
Console . WriteLine ( "Initialized connection to content server {0} with depot id {1}" , server , depotId ) ;
activeClientAuthed [ client ] = Tuple . Create ( depotId , server ) ;
return client ;
}
private async Task < bool > ReauthConnectionAsync ( CDNClient client , CDNClient . Server server , uint appId , uint depotId , byte [ ] depotKey )
private CDNClient . Server BuildConnection ( CancellationToken token )
{
DebugLog . Assert ( server . Type = = "CDN" | | server . Type = = "SteamCache" | | steamSession . AppTickets [ depotId ] = = null , "CDNClientPool" , "Re-authing a CDN or anonymous connection" ) ;
String cdnAuthToken = null ;
try
{
if ( server . Type = = "CDN" | | server . Type = = "SteamCache" )
{
steamSession . RequestCDNAuthToken ( appId , depotId , server . Host ) ;
var cdnKey = string . Format ( "{0:D}:{1}" , depotId , steamSession . ResolveCDNTopLevelHost ( server . Host ) ) ;
SteamApps . CDNAuthTokenCallback authTokenCallback ;
if ( steamSession . CDNAuthTokens . TryGetValue ( cdnKey , out authTokenCallback ) )
{
cdnAuthToken = authTokenCallback . Token ;
}
else
{
throw new Exception ( String . Format ( "Failed to retrieve CDN token for server {0} depot {1}" , server . Host , depotId ) ) ;
}
}
await client . AuthenticateDepotAsync ( depotId , depotKey , cdnAuthToken ) . ConfigureAwait ( false ) ;
activeClientAuthed [ client ] = Tuple . Create ( depotId , server ) ;
return true ;
}
catch ( Exception ex )
if ( availableServerEndpoints . Count < ServerEndpointMinimumSize )
{
Console. WriteLine ( "Failed to reauth to content server {0}: {1}" , server , ex . Message ) ;
populatePoolEvent . Set ( ) ;
}
return false ;
return availableServerEndpoints . Take ( token ) ;
}
public async Task < CDNClient > GetConnectionForDepotAsync ( uint appId , uint depotId , byte [ ] depotKey , CancellationToken token )
public Tuple < CDNClient . Server , string > GetConnectionForDepot ( uint appId , uint depotId , CancellationToken token )
{
CDNClient client = null ;
Tuple < uint , CDNClient . Server > authData ;
activeClientPool . TryTake ( out client ) ;
// if we couldn't find a connection, make one now
if ( client = = null )
// Take a free connection from the connection pool
// If there were no free connections, create a new one from the server list
if ( ! activeConnectionPool . TryTake ( out var server ) )
{
client = await BuildConnectionAsync ( appId , depotId , depotKey , null , token ) . ConfigureAwait ( false ) ;
server = BuildConnection ( token ) ;
}
// if we couldn't find the authorization data or it's not authed to this depotid, re-initialize
if ( ! activeClientAuthed . TryGetValue ( client , out authData ) | | authData . Item1 ! = depotId )
var topLevelHost = steamSession . ResolveCDNTopLevelHost ( server . Host ) ;
var depotKey = Tuple . Create ( depotId , topLevelHost ) ;
// If we don't have a CDN token yet for this server and depot, fetch one now
if ( ! depotCdnKeys . TryGetValue ( depotKey , out var cdnToken ) )
{
if ( ( authData . Item2 . Type = = "CDN" | | authData . Item2 . Type = = "SteamCache" ) & & await ReauthConnectionAsync ( client , authData . Item2 , appId , depotId , depotKey ) . ConfigureAwait ( false ) )
{
Console . WriteLine ( "Re-authed CDN connection to content server {0} from {1} to {2}" , authData . Item2 , authData . Item1 , depotId ) ;
}
else if ( authData . Item2 . Type = = "CS" & & steamSession . AppTickets [ depotId ] = = null & & await ReauthConnectionAsync ( client , authData . Item2 , appId , depotId , depotKey ) . ConfigureAwait ( false ) )
{
Console . WriteLine ( "Re-authed anonymous connection to content server {0} from {1} to {2}" , authData . Item2 , authData . Item1 , depotId ) ;
}
else
{
ReleaseConnection ( client ) ;
client = await BuildConnectionAsync ( appId , depotId , depotKey , authData . Item2 , token ) . ConfigureAwait ( false ) ;
}
depotCdnKeys [ depotKey ] = cdnToken = AuthenticateConnection ( appId , depotId , server ) ;
}
return client ;
return Tuple . Create ( server , cdnToken ) ;
}
public void ReturnConnection ( CDNClient client )
public void ReturnConnection ( Tuple < CDNClient . Server , string > server )
{
if ( client = = null ) return ;
if ( server = = null ) return ;
activeC lientPool. Add ( client ) ;
activeConnectionPool . Add ( server . Item1 ) ;
}
public void ReturnBrokenConnection ( CDNClient client )
public void ReturnBrokenConnection ( Tuple < CDNClient . Server , string > server )
{
if ( client = = null ) return ;
if ( server = = null ) return ;
ReleaseConnection ( client ) ;
// Broken connections are not returned to the pool
}
}
}