Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RIN.InternalAPI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public static void Main(string[] args)

builder.Services.AddSingleton<DB>();
builder.Services.AddSingleton<SDB>();
builder.Services.AddSingleton<DbEventBus>();
builder.Services.AddHostedService(provider => provider.GetRequiredService<DbEventBus>());

var app = builder.Build();
app.UseSerilogRequestLogging();
Expand Down
164 changes: 164 additions & 0 deletions RIN.InternalAPI/Services/DbEventBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using Npgsql;
using RIN.Core.DB;
using RIN.InternalAPI.Models;

namespace RIN.InternalAPI.Services
{
public class DbEventBus : BackgroundService
{
private readonly string ConnStr;
private readonly DB Db;
private readonly ILogger<DbEventBus> Logger;
private readonly ConcurrentDictionary<Guid, Channel<Event>> Subscribers = new();
private readonly Channel<Event> InternalChannel = Channel.CreateUnbounded<Event>();
private static readonly ConcurrentDictionary<string, Type?> EventTypeCache = new();

public DbEventBus(DB db, ILogger<DbEventBus> logger)
{
Db = db;
ConnStr = db.ConnStr;
Logger = logger;
}

public Guid Subscribe(Channel<Event> channel)
{
var id = Guid.NewGuid();
Subscribers.TryAdd(id, channel);
return id;
}

public void Unsubscribe(Guid id) => Subscribers.TryRemove(id, out _);

protected override async Task ExecuteAsync(CancellationToken ct)
{
try
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var listenTask = ListenToPostgres(cts.Token);
var processTask = ProcessEventsAsync(cts.Token);

await Task.WhenAny(listenTask, processTask);
await cts.CancelAsync();
await Task.WhenAll(listenTask, processTask);
}
catch (Exception ex) when (!ct.IsCancellationRequested)
{
Logger.LogError(ex, ex.Message);
}
}

private async Task ListenToPostgres(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await using var conn = new NpgsqlConnection(ConnStr);
await conn.OpenAsync(ct);
conn.Notification += (_, e) => ParseAndQueueEvent(e.Payload);

await using (var cmd = new NpgsqlCommand("LISTEN events", conn))
{
await cmd.ExecuteNonQueryAsync(ct);
}

Logger.LogInformation("Listening for db events");

while (!ct.IsCancellationRequested)
{
await conn.WaitAsync(ct);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
Logger.LogError(ex, "DbEventBus error ocurred, reconnecting in 10 seconds");
await Task.Delay(10000, ct);
}
}
}

private void ParseAndQueueEvent(string payloadJson)
{
try
{
var dbEvent = payloadJson.Split(["->"], StringSplitOptions.None);
if (dbEvent.Length != 2) return;

var eventType = dbEvent[0];
var payloadStr = dbEvent[1];

var type = EventTypeCache.GetOrAdd(eventType, type => Type.GetType($"RIN.InternalAPI.Models.{type}"));
if (type == null)
{
Logger.LogWarning("Unknown event type: {eventType}", eventType);
return;
}

var payload = JsonSerializer.Deserialize(payloadStr, type);
if (payload is Event evt)
{
InternalChannel.Writer.TryWrite(evt);
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Error parsing event payload: {payloadJson}", payloadJson);
}
}

private async Task ProcessEventsAsync(CancellationToken ct)
{
await foreach (var evt in InternalChannel.Reader.ReadAllAsync(ct))
{
try
{
if (evt is CharacterVisualsUpdated cvu)
{
_ = FetchCharacterVisuals(cvu);
}
else
{
DispatchToAll(evt);
}
}
catch (Exception ex)
{
Logger.LogError(ex, "Error processing event");
}
}
}

private async Task FetchCharacterVisuals(CharacterVisualsUpdated cvu)
{
try
{
var result = await Db.GetBasicCharacterAndVisualData((long)cvu.CharacterGuid);
var bfVisuals = PlayerBattleframeVisuals.CreateDefault();

cvu.CharacterAndBattleframeVisuals = new CharacterAndBattleframeVisuals
{
CharacterInfo = result.info,
CharacterVisuals = result.visuals,
BattleframeVisuals = bfVisuals
};

DispatchToAll(cvu);
}
catch (Exception ex)
{
Logger.LogError(ex, "Error while fetching character and battleframe visuals: {message}", ex.Message);
}
}

private void DispatchToAll(Event evt)
{
foreach (var sub in Subscribers)
{
sub.Value.Writer.TryWrite(evt);
}
}
}
}
112 changes: 47 additions & 65 deletions RIN.InternalAPI/Services/GameServerAPI.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
using System.Text.Json;
using System.Threading.Channels;
using Grpc.Core;
using Npgsql;
using RIN.Core.DB;
using RIN.InternalAPI.Models;

namespace RIN.InternalAPI.Services
{
public class GameServerAPI : IGameServerAPI
{
private readonly DB DB;
private readonly DB Db;
private readonly ILogger<GameServerAPI> Logger;
private readonly DbEventBus EventBus;
private readonly IHostApplicationLifetime Lifetime;

public GameServerAPI(DB db, ILogger<GameServerAPI> logger)
public GameServerAPI(DB db, ILogger<GameServerAPI> logger, DbEventBus eventBus, IHostApplicationLifetime lifetime)
{
DB = db;
Db = db;
Logger = logger;
EventBus = eventBus;
Lifetime = lifetime;
}

public async ValueTask<PingResp> Ping(PingReq req)
Expand All @@ -30,13 +33,13 @@ public async ValueTask<PingResp> Ping(PingReq req)

public async ValueTask<CharacterAndBattleframeVisuals> GetCharacterAndBattleframeVisuals(CharacterID req)
{
var result = await DB.GetBasicCharacterAndVisualData(req.ID);
var result = await Db.GetBasicCharacterAndVisualData(req.ID);
var bfVisuals = PlayerBattleframeVisuals.CreateDefault();

var resp = new CharacterAndBattleframeVisuals
{
CharacterInfo = result.Item1,
CharacterVisuals = result.Item2,
CharacterInfo = result.info,
CharacterVisuals = result.visuals,
BattleframeVisuals = bfVisuals
};

Expand All @@ -45,73 +48,52 @@ public async ValueTask<CharacterAndBattleframeVisuals> GetCharacterAndBattlefram

public async Task Stream(IAsyncStreamReader<Command> commands, IServerStreamWriter<Event> events, ServerCallContext context)
{
await using var connection = new NpgsqlConnection(DB.ConnStr);
await connection.OpenAsync();
var channel = Channel.CreateUnbounded<Event>();
var subscriptionId = EventBus.Subscribe(channel);

await using var cmd = new NpgsqlCommand("LISTEN events", connection);
await cmd.ExecuteNonQueryAsync();

connection.Notification += async (_, e) =>
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, Lifetime.ApplicationStopping);
var token = cts.Token;
try
{
var dbEvent = e.Payload.Split(["->"], StringSplitOptions.None);
var eventType = dbEvent[0];
var payloadJson = dbEvent[1];

var type = Type.GetType($"RIN.InternalAPI.Models.{eventType}");

if (type == null)
{
Logger.LogError("Unknown event type: {eventType}", eventType);
return;
}

var payload = JsonSerializer.Deserialize(payloadJson, type);

if (payload is not Event evt)
{
Logger.LogError("Failed to deserialize payload for event type: {eventType}", eventType);
return;
}

if (evt is CharacterVisualsUpdated cvu)
{
var updatedVisuals = await GetCharacterAndBattleframeVisuals(
new CharacterID { ID = (long)cvu.CharacterGuid });

cvu.CharacterAndBattleframeVisuals = updatedVisuals;

await events.WriteAsync(cvu);
}
else
var sendEventsTask = Task.Run(async () =>
{
await events.WriteAsync(evt);
}
};
await foreach (var evt in channel.Reader.ReadAllAsync(token))
{
await events.WriteAsync(evt);
}
});

var commandsTask = Task.Run(async () =>
{
await foreach (var command in commands.ReadAllAsync())
var commandsTask = Task.Run(async () =>
{
Logger.LogInformation("Received command: {command}", command);

switch (command)
await foreach (var command in commands.ReadAllAsync(token))
{
case SaveGameSessionData data:
await DB.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed);
break;
case SaveLgvRaceFinish race:
await DB.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs);
break;
Logger.LogInformation("Received command: {command}", command);

switch (command)
{
case SaveGameSessionData data:
await Db.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed);
break;
case SaveLgvRaceFinish race:
await Db.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs);
break;
}
}
}
});
});

while (!context.CancellationToken.IsCancellationRequested)
await Task.WhenAny(sendEventsTask, commandsTask);
await cts.CancelAsync();
await Task.WhenAll(sendEventsTask, commandsTask);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
await connection.WaitAsync(context.CancellationToken);
Logger.LogError(ex, "GRPC Stream crashed");
}
finally
{
channel.Writer.TryComplete();
EventBus.Unsubscribe(subscriptionId);
}

await commandsTask;
}
}
}