-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathWebsockBaseController.cs
More file actions
276 lines (234 loc) · 9.07 KB
/
WebsockBaseController.cs
File metadata and controls
276 lines (234 loc) · 9.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
using System.Net.Mime;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading.Channels;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using OneOf;
using OneOf.Types;
using OpenShock.Common.Errors;
using OpenShock.Common.Problems;
using OpenShock.Common.Utils;
using JsonOptions = Microsoft.AspNetCore.Http.Json.JsonOptions;
namespace OpenShock.Common.Websocket;
/// <summary>
/// Base for json serialized websocket controller, you can override the SendMessageMethod to implement a different serializer
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class WebsocketBaseController<T> : OpenShockControllerBase, IAsyncDisposable, IDisposable,
IWebsocketController<T> where T : class
{
/// <inheritdoc />
public abstract Guid Id { get; }
/// <summary>
/// Logger
/// </summary>
protected readonly ILogger<WebsocketBaseController<T>> Logger;
/// <summary>
/// When passing a cancellation token, pass this Linked token, it is a Link from ApplicationStopping and Close.
/// </summary>
private CancellationTokenSource? _linkedSource;
protected CancellationToken LinkedToken;
/// <summary>
/// Channel for multithreading thread safety of the websocket, MessageLoop is the only reader for this channel
/// </summary>
protected readonly Channel<T> Channel = System.Threading.Channels.Channel.CreateUnbounded<T>();
#pragma warning disable IDISP008
protected WebSocket? WebSocket;
#pragma warning restore IDISP008
/// <summary>
/// DI
/// </summary>
/// <param name="logger"></param>
protected WebsocketBaseController(ILogger<WebsocketBaseController<T>> logger)
{
Logger = logger;
}
/// <inheritdoc />
[NonAction]
public ValueTask QueueMessage(T data) => Channel.Writer.WriteAsync(data, LinkedToken);
private bool _disposed;
/// <inheritdoc />
[NonAction]
public virtual void Dispose()
{
// ReSharper disable once MethodSupportsCancellation
DisposeAsync().AsTask().Wait();
}
/// <inheritdoc />
[NonAction]
public virtual async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
Logger.LogTrace("Disposing websocket controller..");
await DisposeControllerAsync();
await UnregisterConnection();
Channel.Writer.TryComplete();
WebSocket?.Dispose();
_linkedSource?.Dispose();
GC.SuppressFinalize(this);
Logger.LogTrace("Disposed websocket controller");
}
/// <summary>
/// Dispose function for any inheriting controller
/// </summary>
/// <returns></returns>
[NonAction]
protected virtual ValueTask DisposeControllerAsync() => ValueTask.CompletedTask;
/// <summary>
/// Initial get request to the websocket route - rewrite to websocket connection
/// </summary>
[ApiExplorerSettings(IgnoreApi = true)]
[HttpGet]
public async Task Get([FromServices] IHostApplicationLifetime lifetime, CancellationToken cancellationToken)
{
#pragma warning disable IDISP003
_linkedSource = CancellationTokenSource.CreateLinkedTokenSource(lifetime.ApplicationStopping, cancellationToken);
#pragma warning restore IDISP003
LinkedToken = _linkedSource.Token;
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
var jsonOptions = HttpContext.RequestServices.GetRequiredService<IOptions<JsonOptions>>();
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
var response = WebsocketError.NonWebsocketRequest;
response.AddContext(HttpContext);
// ReSharper disable once MethodSupportsCancellation
await HttpContext.Response.WriteAsJsonAsync(
response,
jsonOptions.Value.SerializerOptions,
contentType: MediaTypeNames.Application.ProblemJson,
cancellationToken: cancellationToken);
return;
}
var connectionPrecondition = await ConnectionPrecondition();
if (connectionPrecondition.IsT1)
{
var jsonOptions = HttpContext.RequestServices.GetRequiredService<IOptions<JsonOptions>>();
var response = connectionPrecondition.AsT1.Value;
HttpContext.Response.StatusCode = response.Status ?? StatusCodes.Status400BadRequest;
response.AddContext(HttpContext);
// ReSharper disable once MethodSupportsCancellation
await HttpContext.Response.WriteAsJsonAsync(
response,
jsonOptions.Value.SerializerOptions,
contentType: MediaTypeNames.Application.ProblemJson,
cancellationToken: cancellationToken);
return;
}
Logger.LogInformation("Opening websocket connection");
#pragma warning disable IDISP003
WebSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
#pragma warning restore IDISP003
#pragma warning disable CS4014
OsTask.Run(MessageLoop);
#pragma warning restore CS4014
await SendInitialData();
await Logic();
// Logic ended
await UnregisterConnection();
// Only send close if the socket is still open, this allows us to close the websocket from inside the logic
// We send close if the client sent a close message though
if (WebSocket is { State: WebSocketState.Open or WebSocketState.CloseReceived })
{
await WebSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Normal closure",
LinkedToken);
}
}
#region Send Loop
/// <summary>
/// Message loop to send out messages in the channel
/// </summary>
[NonAction]
private async Task MessageLoop()
{
await foreach (var msg in Channel.Reader.ReadAllAsync(LinkedToken))
{
try
{
await SendWebSocketMessage(msg, WebSocket!, LinkedToken);
}
catch (Exception e)
{
Logger.LogError(e, "Error while sending message to client - {Msg}", JsonSerializer.Serialize(msg));
throw;
}
}
}
/// <summary>
/// Implementation method for sending the message out to the websocket, you might also wanna apply serialization here
/// </summary>
/// <param name="message"></param>
/// <param name="websocket"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[NonAction]
protected virtual Task SendWebSocketMessage(T message, WebSocket websocket, CancellationToken cancellationToken) =>
JsonWebSocketUtils.SendFullMessage(message, websocket, cancellationToken);
#endregion
/// <summary>
/// Main receiver logic for the websocket
/// </summary>
/// <returns></returns>
[NonAction]
private async Task Logic()
{
while (!LinkedToken.IsCancellationRequested)
{
try
{
if (WebSocket == null)
{
Logger.LogWarning("WebSocket is null, aborting");
return;
}
if (WebSocket.State is WebSocketState.CloseReceived or WebSocketState.CloseSent or WebSocketState.Closed)
{
// Client or we sent close message or both, we will close the connection after this
return;
}
if (WebSocket!.State != WebSocketState.Open)
{
Logger.LogWarning("WebSocket is not open [{State}], aborting", WebSocket.State);
WebSocket?.Abort();
return;
}
await HandleReceive();
}
catch (OperationCanceledException)
{
Logger.LogWarning("WebSocket connection terminated due to close or shutdown");
return;
}
catch (Exception ex)
{
Logger.LogError(ex, "Exception while processing websocket request");
WebSocket?.Abort();
return;
}
}
}
/// <summary>
///
/// </summary>
/// <returns>True if you want to continue the receiver loop, false if you want to terminate</returns>
[NonAction]
protected abstract Task<bool> HandleReceive();
/// <summary>
/// Send initial data to the client
/// </summary>
/// <returns></returns>
[NonAction]
protected virtual Task SendInitialData() => Task.CompletedTask;
/// <summary>
/// Action when the websocket connection is finished or disposed
/// </summary>
[NonAction]
protected virtual Task UnregisterConnection() => Task.CompletedTask;
/// <summary>
/// Action when the websocket connection is destroyed to unregister the connection to a websocket manager
/// </summary>
[NonAction]
protected virtual Task<OneOf<Success, Error<OpenShockProblem>>> ConnectionPrecondition() =>
Task.FromResult(OneOf<Success, Error<OpenShockProblem>>.FromT0(new Success()));
}