diff --git a/RIN.InternalAPI/Program.cs b/RIN.InternalAPI/Program.cs index 6175687..ba912a4 100644 --- a/RIN.InternalAPI/Program.cs +++ b/RIN.InternalAPI/Program.cs @@ -27,6 +27,8 @@ public static void Main(string[] args) builder.Services.AddSingleton(); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); + builder.Services.AddHostedService(provider => provider.GetRequiredService()); var app = builder.Build(); app.UseSerilogRequestLogging(); diff --git a/RIN.InternalAPI/Services/DbEventBus.cs b/RIN.InternalAPI/Services/DbEventBus.cs new file mode 100644 index 0000000..f70f3b1 --- /dev/null +++ b/RIN.InternalAPI/Services/DbEventBus.cs @@ -0,0 +1,164 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using System.Threading.Channels; +using Npgsql; +using RIN.Core.DB; +using RIN.InternalAPI.Models; + +namespace RIN.InternalAPI.Services +{ + public class DbEventBus : BackgroundService + { + private readonly string ConnStr; + private readonly DB Db; + private readonly ILogger Logger; + private readonly ConcurrentDictionary> Subscribers = new(); + private readonly Channel InternalChannel = Channel.CreateUnbounded(); + private static readonly ConcurrentDictionary EventTypeCache = new(); + + public DbEventBus(DB db, ILogger logger) + { + Db = db; + ConnStr = db.ConnStr; + Logger = logger; + } + + public Guid Subscribe(Channel channel) + { + var id = Guid.NewGuid(); + Subscribers.TryAdd(id, channel); + return id; + } + + public void Unsubscribe(Guid id) => Subscribers.TryRemove(id, out _); + + protected override async Task ExecuteAsync(CancellationToken ct) + { + try + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var listenTask = ListenToPostgres(cts.Token); + var processTask = ProcessEventsAsync(cts.Token); + + await Task.WhenAny(listenTask, processTask); + await cts.CancelAsync(); + await Task.WhenAll(listenTask, processTask); + } + catch (Exception ex) when (!ct.IsCancellationRequested) + { + Logger.LogError(ex, ex.Message); + } + } + + private async Task ListenToPostgres(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await using var conn = new NpgsqlConnection(ConnStr); + await conn.OpenAsync(ct); + conn.Notification += (_, e) => ParseAndQueueEvent(e.Payload); + + await using (var cmd = new NpgsqlCommand("LISTEN events", conn)) + { + await cmd.ExecuteNonQueryAsync(ct); + } + + Logger.LogInformation("Listening for db events"); + + while (!ct.IsCancellationRequested) + { + await conn.WaitAsync(ct); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + Logger.LogError(ex, "DbEventBus error ocurred, reconnecting in 10 seconds"); + await Task.Delay(10000, ct); + } + } + } + + private void ParseAndQueueEvent(string payloadJson) + { + try + { + var dbEvent = payloadJson.Split(["->"], StringSplitOptions.None); + if (dbEvent.Length != 2) return; + + var eventType = dbEvent[0]; + var payloadStr = dbEvent[1]; + + var type = EventTypeCache.GetOrAdd(eventType, type => Type.GetType($"RIN.InternalAPI.Models.{type}")); + if (type == null) + { + Logger.LogWarning("Unknown event type: {eventType}", eventType); + return; + } + + var payload = JsonSerializer.Deserialize(payloadStr, type); + if (payload is Event evt) + { + InternalChannel.Writer.TryWrite(evt); + } + } + catch (Exception ex) + { + Logger.LogError(ex, "Error parsing event payload: {payloadJson}", payloadJson); + } + } + + private async Task ProcessEventsAsync(CancellationToken ct) + { + await foreach (var evt in InternalChannel.Reader.ReadAllAsync(ct)) + { + try + { + if (evt is CharacterVisualsUpdated cvu) + { + _ = FetchCharacterVisuals(cvu); + } + else + { + DispatchToAll(evt); + } + } + catch (Exception ex) + { + Logger.LogError(ex, "Error processing event"); + } + } + } + + private async Task FetchCharacterVisuals(CharacterVisualsUpdated cvu) + { + try + { + var result = await Db.GetBasicCharacterAndVisualData((long)cvu.CharacterGuid); + var bfVisuals = PlayerBattleframeVisuals.CreateDefault(); + + cvu.CharacterAndBattleframeVisuals = new CharacterAndBattleframeVisuals + { + CharacterInfo = result.info, + CharacterVisuals = result.visuals, + BattleframeVisuals = bfVisuals + }; + + DispatchToAll(cvu); + } + catch (Exception ex) + { + Logger.LogError(ex, "Error while fetching character and battleframe visuals: {message}", ex.Message); + } + } + + private void DispatchToAll(Event evt) + { + foreach (var sub in Subscribers) + { + sub.Value.Writer.TryWrite(evt); + } + } + } +} diff --git a/RIN.InternalAPI/Services/GameServerAPI.cs b/RIN.InternalAPI/Services/GameServerAPI.cs index 7dab23b..bd1478b 100644 --- a/RIN.InternalAPI/Services/GameServerAPI.cs +++ b/RIN.InternalAPI/Services/GameServerAPI.cs @@ -1,6 +1,5 @@ -using System.Text.Json; +using System.Threading.Channels; using Grpc.Core; -using Npgsql; using RIN.Core.DB; using RIN.InternalAPI.Models; @@ -8,13 +7,17 @@ namespace RIN.InternalAPI.Services { public class GameServerAPI : IGameServerAPI { - private readonly DB DB; + private readonly DB Db; private readonly ILogger Logger; + private readonly DbEventBus EventBus; + private readonly IHostApplicationLifetime Lifetime; - public GameServerAPI(DB db, ILogger logger) + public GameServerAPI(DB db, ILogger logger, DbEventBus eventBus, IHostApplicationLifetime lifetime) { - DB = db; + Db = db; Logger = logger; + EventBus = eventBus; + Lifetime = lifetime; } public async ValueTask Ping(PingReq req) @@ -30,13 +33,13 @@ public async ValueTask Ping(PingReq req) public async ValueTask GetCharacterAndBattleframeVisuals(CharacterID req) { - var result = await DB.GetBasicCharacterAndVisualData(req.ID); + var result = await Db.GetBasicCharacterAndVisualData(req.ID); var bfVisuals = PlayerBattleframeVisuals.CreateDefault(); var resp = new CharacterAndBattleframeVisuals { - CharacterInfo = result.Item1, - CharacterVisuals = result.Item2, + CharacterInfo = result.info, + CharacterVisuals = result.visuals, BattleframeVisuals = bfVisuals }; @@ -45,73 +48,52 @@ public async ValueTask GetCharacterAndBattlefram public async Task Stream(IAsyncStreamReader commands, IServerStreamWriter events, ServerCallContext context) { - await using var connection = new NpgsqlConnection(DB.ConnStr); - await connection.OpenAsync(); + var channel = Channel.CreateUnbounded(); + var subscriptionId = EventBus.Subscribe(channel); - await using var cmd = new NpgsqlCommand("LISTEN events", connection); - await cmd.ExecuteNonQueryAsync(); - - connection.Notification += async (_, e) => + using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, Lifetime.ApplicationStopping); + var token = cts.Token; + try { - var dbEvent = e.Payload.Split(["->"], StringSplitOptions.None); - var eventType = dbEvent[0]; - var payloadJson = dbEvent[1]; - - var type = Type.GetType($"RIN.InternalAPI.Models.{eventType}"); - - if (type == null) - { - Logger.LogError("Unknown event type: {eventType}", eventType); - return; - } - - var payload = JsonSerializer.Deserialize(payloadJson, type); - - if (payload is not Event evt) - { - Logger.LogError("Failed to deserialize payload for event type: {eventType}", eventType); - return; - } - - if (evt is CharacterVisualsUpdated cvu) - { - var updatedVisuals = await GetCharacterAndBattleframeVisuals( - new CharacterID { ID = (long)cvu.CharacterGuid }); - - cvu.CharacterAndBattleframeVisuals = updatedVisuals; - - await events.WriteAsync(cvu); - } - else + var sendEventsTask = Task.Run(async () => { - await events.WriteAsync(evt); - } - }; + await foreach (var evt in channel.Reader.ReadAllAsync(token)) + { + await events.WriteAsync(evt); + } + }); - var commandsTask = Task.Run(async () => - { - await foreach (var command in commands.ReadAllAsync()) + var commandsTask = Task.Run(async () => { - Logger.LogInformation("Received command: {command}", command); - - switch (command) + await foreach (var command in commands.ReadAllAsync(token)) { - case SaveGameSessionData data: - await DB.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed); - break; - case SaveLgvRaceFinish race: - await DB.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs); - break; + Logger.LogInformation("Received command: {command}", command); + + switch (command) + { + case SaveGameSessionData data: + await Db.UpdateCharacterAfterGameSession((long)data.CharacterId, (int)data.ZoneId, (int)data.OutpostId, (int)data.TimePlayed); + break; + case SaveLgvRaceFinish race: + await Db.SaveLgvRaceFinish((long)race.CharacterGuid, (int)race.LeaderboardId, (long)race.TimeMs); + break; + } } - } - }); + }); - while (!context.CancellationToken.IsCancellationRequested) + await Task.WhenAny(sendEventsTask, commandsTask); + await cts.CancelAsync(); + await Task.WhenAll(sendEventsTask, commandsTask); + } + catch (Exception ex) when (ex is not OperationCanceledException) { - await connection.WaitAsync(context.CancellationToken); + Logger.LogError(ex, "GRPC Stream crashed"); + } + finally + { + channel.Writer.TryComplete(); + EventBus.Unsubscribe(subscriptionId); } - - await commandsTask; } } }