Added CDN client pooling

pull/25/head
Ryan Kistner 9 years ago
parent 72c32a618e
commit 0869f85b93

@ -0,0 +1,235 @@
using SteamKit2;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace DepotDownloader
{
/// <summary>
/// CDNClientPool provides a pool of CDNClients to CDN endpoints
/// CDNClients that get re-used will be initialized for the correct depots
/// </summary>
class CDNClientPool
{
private static int ServerEndpointMinimumSize = 8;
private Steam3Session steamSession;
private ConcurrentBag<CDNClient> activeClientPool;
private ConcurrentDictionary<CDNClient, Tuple<uint, CDNClient.Server>> activeClientAuthed;
private BlockingCollection<CDNClient.Server> availableServerEndpoints;
private AutoResetEvent populatePoolEvent;
private Thread monitorThread;
public CDNClientPool(Steam3Session steamSession)
{
this.steamSession = steamSession;
activeClientPool = new ConcurrentBag<CDNClient>();
activeClientAuthed = new ConcurrentDictionary<CDNClient, Tuple<uint, CDNClient.Server>>();
availableServerEndpoints = new BlockingCollection<CDNClient.Server>();
populatePoolEvent = new AutoResetEvent(true);
monitorThread = new Thread(ConnectionPoolMonitor);
monitorThread.Name = "CDNClient Pool Monitor";
monitorThread.IsBackground = true;
monitorThread.Start();
}
private List<CDNClient.Server> fetchBootstrapServerList()
{
CDNClient bootstrap = new CDNClient(steamSession.steamClient);
while (true)
{
try
{
var cdnServers = bootstrap.FetchServerList(cellId: (uint)ContentDownloader.Config.CellID);
if (cdnServers != null)
{
return cdnServers;
}
}
catch (Exception)
{
DebugLog.WriteLine("CDNClientPool", "Failed to retrieve content server list");
}
}
}
private void ConnectionPoolMonitor()
{
while(true)
{
populatePoolEvent.WaitOne(TimeSpan.FromSeconds(1));
// peek ahead into steam session to see if we have servers
if (availableServerEndpoints.Count < ServerEndpointMinimumSize &&
steamSession.steamClient.IsConnected &&
steamSession.steamClient.GetServersOfType(EServerType.CS).Count > 0)
{
var servers = fetchBootstrapServerList();
var weightedCdnServers = servers.Select(x =>
{
int penalty = 0;
ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(x.Host, out penalty);
return Tuple.Create(x, penalty);
}).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad);
foreach (var endpoint in weightedCdnServers)
{
for (var i = 0; i < endpoint.Item1.NumEntries; i++) {
availableServerEndpoints.Add(endpoint.Item1);
}
}
}
}
}
private void releaseConnection(CDNClient client)
{
Tuple<uint, CDNClient.Server> authData;
activeClientAuthed.TryRemove(client, out authData);
}
private CDNClient buildConnection(uint depotId, byte[] depotKey, CDNClient.Server serverSeed, CancellationToken token)
{
CDNClient.Server server = null;
CDNClient client = null;
while (client == null)
{
// if we want to re-initialize a specific content server, try that one first
if (serverSeed != null)
{
server = serverSeed;
serverSeed = null;
}
else
{
if (availableServerEndpoints.Count < ServerEndpointMinimumSize)
{
populatePoolEvent.Set();
}
server = availableServerEndpoints.Take(token);
}
client = new CDNClient(steamSession.steamClient, steamSession.AppTickets[depotId]);
string cdnAuthToken = null;
if (server.Type == "CDN")
{
steamSession.RequestCDNAuthToken(depotId, server.Host);
cdnAuthToken = steamSession.CDNAuthTokens[Tuple.Create(depotId, server.Host)].Token;
}
try
{
client.Connect(server);
client.AuthenticateDepot(depotId, depotKey, cdnAuthToken);
}
catch (Exception ex)
{
client = null;
DebugLog.WriteLine("CDNClientPool", "Failed to connect to content server {0}: {1}", server, ex.Message);
int penalty = 0;
ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(server.Host, out penalty);
ConfigStore.TheConfig.ContentServerPenalty[server.Host] = penalty + 1;
}
}
DebugLog.WriteLine("CDNClientPool", "Initialized connection to content server {0} with depot id {1}", server, depotId);
activeClientAuthed[client] = Tuple.Create(depotId, server);
return client;
}
private bool reauthConnection(CDNClient client, CDNClient.Server server, uint depotId, byte[] depotKey)
{
DebugLog.Assert(server.Type == "CDN" || steamSession.AppTickets[depotId] == null, "CDNClientPool", "Re-authing a CDN or anonymous connection");
String cdnAuthToken = null;
if (server.Type == "CDN")
{
steamSession.RequestCDNAuthToken(depotId, server.Host);
cdnAuthToken = steamSession.CDNAuthTokens[Tuple.Create(depotId, server.Host)].Token;
}
try
{
client.AuthenticateDepot(depotId, depotKey, cdnAuthToken);
activeClientAuthed[client] = Tuple.Create(depotId, server);
return true;
}
catch (Exception ex)
{
DebugLog.WriteLine("CDNClientPool", "Failed to reauth to content server {0}: {1}", server, ex.Message);
}
return false;
}
public CDNClient getConnectionForDepot(uint depotId, byte[] depotKey, CancellationToken token)
{
CDNClient client = null;
Tuple<uint, CDNClient.Server> authData;
activeClientPool.TryTake(out client);
// if we couldn't find a connection, make one now
if (client == null)
{
client = buildConnection(depotId, depotKey, null, token);
}
// if we couldn't find the authorization data or it's not authed to this depotid, re-initialize
if (!activeClientAuthed.TryGetValue(client, out authData) || authData.Item1 != depotId)
{
if (authData.Item2.Type == "CDN" && reauthConnection(client, authData.Item2, depotId, depotKey))
{
DebugLog.WriteLine("CDNClientPool", "Re-authed CDN connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId);
}
else if (authData.Item2.Type == "CS" && steamSession.AppTickets[depotId] == null && reauthConnection(client, authData.Item2, depotId, depotKey))
{
DebugLog.WriteLine("CDNClientPool", "Re-authed anonymous connection to content server {0} from {1} to {2}", authData.Item2, authData.Item1, depotId);
}
else
{
releaseConnection(client);
client = buildConnection(depotId, depotKey, authData.Item2, token);
}
}
return client;
}
public void returnConnection(CDNClient client)
{
if (client == null) return;
activeClientPool.Add(client);
}
public void returnBrokenConnection(CDNClient client)
{
if (client == null) return;
releaseConnection(client);
}
}
}

@ -23,6 +23,7 @@ namespace DepotDownloader
private static Steam3Session steam3;
private static Steam3Session.Credentials steam3Credentials;
private static CDNClientPool cdnPool;
private const string DEFAULT_DOWNLOAD_DIR = "depots";
private const string CONFIG_DIR = ".DepotDownloader";
@ -316,6 +317,8 @@ namespace DepotDownloader
Console.WriteLine("Unable to get steam3 credentials.");
return;
}
cdnPool = new CDNClientPool(steam3);
}
public static void ShutdownSteam3()
@ -472,87 +475,6 @@ namespace DepotDownloader
public ProtoManifest.ChunkData NewChunk { get; private set; }
}
private static BlockingCollection<CDNClient> CollectCDNClientsForDepot(DepotDownloadInfo depot)
{
Console.WriteLine("Finding content servers...");
var cdnClients = new BlockingCollection<CDNClient>();
CDNClient initialClient = new CDNClient( steam3.steamClient, steam3.AppTickets[depot.id] );
List<CDNClient.Server> cdnServers = null;
while(true)
{
try
{
cdnServers = initialClient.FetchServerList( cellId: (uint)Config.CellID );
if (cdnServers != null) break;
}
catch (WebException)
{
Console.WriteLine("\nFailed to retrieve content server list.");
Thread.Sleep(500);
}
}
if (cdnServers == null)
{
Console.WriteLine("\nUnable to query any content servers for depot {0} - {1}", depot.id, depot.contentName);
return cdnClients;
}
var weightedCdnServers = cdnServers.Select(x =>
{
int penalty = 0;
ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(x.Host, out penalty);
return Tuple.Create(x, penalty);
}).OrderBy(x => x.Item2).ThenBy(x => x.Item1.WeightedLoad);
// Grab up to the first eight server in the allegedly best-to-worst order from Steam
Parallel.ForEach(weightedCdnServers, new ParallelOptions { MaxDegreeOfParallelism = 2 }, (serverTuple, parallelLoop) =>
{
var server = serverTuple.Item1;
CDNClient c = new CDNClient( steam3.steamClient, steam3.AppTickets[ depot.id ] );
try
{
for (int i = 0; i < server.NumEntries; i++)
{
c.Connect(server);
string cdnAuthToken = null;
if (server.Type == "CDN")
{
steam3.RequestCDNAuthToken(depot.id, server.Host);
cdnAuthToken = steam3.CDNAuthTokens[Tuple.Create(depot.id, server.Host)].Token;
}
c.AuthenticateDepot(depot.id, depot.depotKey, cdnAuthToken);
cdnClients.Add(c);
}
if (cdnClients.Count >= Config.MaxServers) parallelLoop.Stop();
}
catch (Exception ex)
{
int penalty = 0;
ConfigStore.TheConfig.ContentServerPenalty.TryGetValue(server.Host, out penalty);
ConfigStore.TheConfig.ContentServerPenalty[server.Host] = penalty + 1;
Console.WriteLine("\nFailed to connect to content server {0}: {1}", server, ex.Message);
}
});
if (cdnClients.Count == 0)
{
Console.WriteLine("\nUnable to find any content servers for depot {0} - {1}", depot.id, depot.contentName);
}
Config.MaxDownloads = Math.Min(Config.MaxDownloads, cdnClients.Count);
return cdnClients;
}
private static void DownloadSteam3( List<DepotDownloadInfo> depots )
{
ulong TotalBytesCompressed = 0;
@ -565,8 +487,6 @@ namespace DepotDownloader
Console.WriteLine("Downloading depot {0} - {1}", depot.id, depot.contentName);
var cdnClientsLock = new Object();
BlockingCollection<CDNClient> cdnClients = null;
CancellationTokenSource cts = new CancellationTokenSource();
ProtoManifest oldProtoManifest = null;
@ -610,17 +530,43 @@ namespace DepotDownloader
DepotManifest depotManifest = null;
cdnClients = CollectCDNClientsForDepot(depot);
foreach (var c in cdnClients)
while (depotManifest == null)
{
try
CDNClient client = null;
try {
client = cdnPool.getConnectionForDepot(depot.id, depot.depotKey, CancellationToken.None);
depotManifest = client.DownloadManifest(depot.id, depot.manifestId);
cdnPool.returnConnection(client);
}
catch (WebException e)
{
depotManifest = c.DownloadManifest(depot.id, depot.manifestId);
break;
cdnPool.returnBrokenConnection(client);
if (e.Status == WebExceptionStatus.ProtocolError)
{
var response = e.Response as HttpWebResponse;
if (response.StatusCode == HttpStatusCode.Unauthorized || response.StatusCode == HttpStatusCode.Forbidden)
{
Console.WriteLine("Encountered 401 for depot manifest {0} {1}. Aborting.", depot.id, depot.manifestId);
break;
}
else
{
Console.WriteLine("Encountered error downloading depot manifest {0} {1}: {2}", depot.id, depot.manifestId, response.StatusCode);
}
}
else
{
Console.WriteLine("Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Status);
}
}
catch (Exception e)
{
cdnPool.returnBrokenConnection(client);
Console.WriteLine("Encountered error downloading manifest for depot {0} {1}: {2}", depot.id, depot.manifestId, e.Message);
}
catch (WebException) { }
catch (SocketException) { }
}
if (depotManifest == null)
@ -794,18 +740,6 @@ namespace DepotDownloader
}
}
if (neededChunks.Count > 0 && cdnClients == null)
{
// If we didn't need to connect to get manifests, connect now.
lock (cdnClientsLock)
{
if (cdnClients == null)
{
cdnClients = CollectCDNClientsForDepot(depot);
}
}
}
foreach (var chunk in neededChunks)
{
if (cts.IsCancellationRequested) break;
@ -818,7 +752,7 @@ namespace DepotDownloader
CDNClient client;
try
{
client = cdnClients.Take(cts.Token);
client = cdnPool.getConnectionForDepot(depot.id, depot.depotKey, cts.Token);
}
catch (OperationCanceledException)
{
@ -835,10 +769,13 @@ namespace DepotDownloader
try
{
chunkData = client.DownloadDepotChunk(depot.id, data);
cdnPool.returnConnection(client);
break;
}
catch (WebException e)
{
cdnPool.returnBrokenConnection(client);
if (e.Status == WebExceptionStatus.ProtocolError)
{
var response = e.Response as HttpWebResponse;
@ -857,18 +794,12 @@ namespace DepotDownloader
{
Console.WriteLine("Encountered error downloading chunk {0}: {1}", chunkID, e.Status);
}
// let client "cool off" before re-adding it to the queue
Thread.Sleep(500);
}
catch (Exception e)
{
cdnPool.returnBrokenConnection(client);
Console.WriteLine("Encountered unexpected error downloading chunk {0}: {1}", chunkID, e.Message);
}
finally
{
cdnClients.Add(client);
}
}
if (chunkData == null)

@ -80,6 +80,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="CDNClientPool.cs" />
<Compile Include="ContentDownloader.cs" />
<Compile Include="ConfigStore.cs" />
<Compile Include="DownloadConfig.cs" />

Loading…
Cancel
Save