-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathClusterQueryExecutor.cs
More file actions
255 lines (211 loc) · 10.6 KB
/
ClusterQueryExecutor.cs
File metadata and controls
255 lines (211 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Couchbase.Core.IO.Serializers;
using Couchbase.Core.Version;
using Couchbase.Linq.Clauses;
using Couchbase.Linq.QueryGeneration;
using Couchbase.Linq.QueryGeneration.MemberNameResolvers;
using Couchbase.Linq.Utils;
using Couchbase.Linq.Versioning;
using Couchbase.Query;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Remotion.Linq;
namespace Couchbase.Linq.Execution
{
internal class ClusterQueryExecutor : IAsyncQueryExecutor
{
private readonly ICluster _cluster;
private readonly ILogger<ClusterQueryExecutor> _logger;
private readonly IClusterVersionProvider _clusterVersionProvider;
private ITypeSerializer? _serializer;
private ITypeSerializer Serializer =>
_serializer ??= _cluster.ClusterServices.GetRequiredService<ITypeSerializer>();
/// <summary>
/// Query timeout callback, if null uses the cluster default.
/// </summary>
public Func<TimeSpan?>? QueryTimeoutProvider { get; set; }
/// <summary>
/// Creates a new BucketQueryExecutor.
/// </summary>
/// <param name="cluster"><see cref="ICluster"/> to query.</param>
public ClusterQueryExecutor(ICluster cluster)
{
_cluster = cluster;
_logger = cluster.ClusterServices.GetRequiredService<ILogger<ClusterQueryExecutor>>();
_clusterVersionProvider = cluster.ClusterServices.GetRequiredService<IClusterVersionProvider>();
}
private LinqQueryOptions GetQueryOptions(QueryModel queryModel, ScalarResultBehavior scalarResultBehavior)
{
var queryOptions = new LinqQueryOptions(scalarResultBehavior);
MutationState? combinedMutationState = null;
foreach (var bodyClause in queryModel.BodyClauses)
{
switch (bodyClause)
{
case ScanConsistencyClause scanConsistency:
queryOptions.ScanConsistency(scanConsistency.ScanConsistency);
if (scanConsistency.ScanWait != null)
{
queryOptions.ScanWait(scanConsistency.ScanWait.Value);
}
break;
case ConsistentWithClause consistentWith:
combinedMutationState ??= new MutationState();
combinedMutationState.Add(consistentWith.MutationState);
if (consistentWith.ScanWait != null)
{
queryOptions.ScanWait(consistentWith.ScanWait.Value);
}
break;
}
}
if (combinedMutationState != null)
{
queryOptions.ConsistentWith(combinedMutationState);
}
var queryTimeout = QueryTimeoutProvider?.Invoke();
if (queryTimeout is not null)
{
queryOptions.Timeout(queryTimeout.GetValueOrDefault());
}
return queryOptions;
}
public IEnumerable<T> ExecuteCollection<T>(QueryModel queryModel)
{
var statement = GenerateQuery(queryModel, out var scalarResultBehavior);
return ExecuteCollection<T>(statement, GetQueryOptions(queryModel, scalarResultBehavior));
}
/// <summary>
/// Execute a <see cref="LinqQueryOptions"/>.
/// </summary>
/// <typeparam name="T">Type returned by the query.</typeparam>
/// <param name="statement">Query to execute.</param>
/// <param name="queryOptions">Options to control execution.</param>
/// <returns>List of objects returned by the request.</returns>
public IEnumerable<T> ExecuteCollection<T>(string statement, LinqQueryOptions queryOptions)
{
// This sync-over-async pattern is not ideal, but necessary for compatibility
var asyncEnumerable = ExecuteCollectionAsync<T>(statement, queryOptions);
var enumerator = asyncEnumerable.GetAsyncEnumerator();
try
{
while (BlockingWait(enumerator.MoveNextAsync()))
{
yield return enumerator.Current;
}
}
finally
{
BlockingWait(enumerator.DisposeAsync());
}
}
private static void BlockingWait(ValueTask task)
{
if (task.IsCompleted)
{
// Avoid allocating a Task unnecessarily if the ValueTask is already completed
task.GetAwaiter().GetResult();
return;
}
// ValueTask must be converted to Task to safely await the result if it is not completed
task.AsTask().GetAwaiter().GetResult();
}
private static T BlockingWait<T>(ValueTask<T> task)
{
if (task.IsCompleted)
{
// Avoid allocating a Task unnecessarily if the ValueTask is already completed
return task.GetAwaiter().GetResult();
}
// ValueTask must be converted to Task to safely await the result if it is not completed
return task.AsTask().GetAwaiter().GetResult();
}
public IAsyncEnumerable<T> ExecuteCollectionAsync<T>(QueryModel queryModel, CancellationToken cancellationToken = default)
{
var statement = GenerateQuery(queryModel, out var scalarResultBehavior);
return ExecuteCollectionAsync<T>(statement, GetQueryOptions(queryModel, scalarResultBehavior), cancellationToken);
}
/// <summary>
/// Asynchronously execute a <see cref="LinqQueryOptions"/>.
/// </summary>
/// <typeparam name="T">Type returned by the query.</typeparam>
/// <param name="statement">Query to execute.</param>
/// <param name="queryOptions">Options to control execution.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Task which contains a list of objects returned by the request when complete.</returns>
public async IAsyncEnumerable<T> ExecuteCollectionAsync<T>(string statement, LinqQueryOptions queryOptions,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// TODO: Make this more efficient with a custom enumerator
queryOptions.CancellationToken(cancellationToken);
IAsyncEnumerable<T> result;
if (!queryOptions.ScalarResultBehavior.ResultExtractionRequired)
{
result = await _cluster.QueryAsync<T>(statement, queryOptions).ConfigureAwait(false);
}
else
{
var tempResult = await _cluster.QueryAsync<ScalarResult<T>>(statement, queryOptions).ConfigureAwait(false);
result = queryOptions.ScalarResultBehavior.ApplyResultExtraction(tempResult);
}
await foreach (var row in result.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return row;
}
}
public T ExecuteScalar<T>(QueryModel queryModel)=>
ExecuteSingle<T>(queryModel, false)!;
public T? ExecuteSingle<T>(QueryModel queryModel, bool returnDefaultWhenEmpty) =>
returnDefaultWhenEmpty
? ExecuteCollection<T>(queryModel).SingleOrDefault()
: ExecuteCollection<T>(queryModel).Single();
public Task<T> ExecuteScalarAsync<T>(QueryModel queryModel, CancellationToken cancellationToken = default) =>
ExecuteSingleAsync<T>(queryModel, false, cancellationToken)!;
public Task<T?> ExecuteSingleAsync<T>(QueryModel queryModel, bool returnDefaultWhenEmpty, CancellationToken cancellationToken = default)
{
// ReSharper disable MethodSupportsCancellation
#pragma warning disable CS8619 // Nullability of reference types in value doesn't match target type.
ValueTask<T?> result = returnDefaultWhenEmpty
? ExecuteCollectionAsync<T>(queryModel).SingleOrDefaultAsync(cancellationToken)
: ExecuteCollectionAsync<T>(queryModel).SingleAsync(cancellationToken);
#pragma warning restore CS8619 // Nullability of reference types in value doesn't match target type.
// ReSharper restore MethodSupportsCancellation
return result.AsTask();
}
public string GenerateQuery(QueryModel queryModel, out ScalarResultBehavior scalarResultBehavior)
{
// If ITypeSerializer is an IExtendedTypeSerializer, use it as the member name resolver
// Otherwise fallback to the legacy behavior which assumes we're using Newtonsoft.Json
// Note that DefaultSerializer implements IExtendedTypeSerializer, but has the same logic as JsonNetMemberNameResolver
var serializer = Serializer as IExtendedTypeSerializer;
var memberNameResolver = serializer != null ?
(IMemberNameResolver)new ExtendedTypeSerializerMemberNameResolver(serializer) :
(IMemberNameResolver)new JsonNetMemberNameResolver(JsonConvert.DefaultSettings!().ContractResolver!);
var methodCallTranslatorProvider = new DefaultMethodCallTranslatorProvider();
var clusterVersionTask = _clusterVersionProvider.GetVersionAsync();
var clusterVersion = clusterVersionTask.IsCompleted
? clusterVersionTask.Result
// TODO: Don't use .Result to block
: clusterVersionTask.AsTask().Result; // Must convert ValueTask to Task to safely await the result if it is not completed
var queryGenerationContext = new N1QlQueryGenerationContext
{
MemberNameResolver = memberNameResolver,
MethodCallTranslatorProvider = methodCallTranslatorProvider,
Serializer = serializer,
ClusterVersion = clusterVersion ?? FeatureVersions.DefaultVersion,
LoggerFactory = _cluster.ClusterServices.GetRequiredService<ILoggerFactory>()
};
var visitor = new N1QlQueryModelVisitor(queryGenerationContext);
visitor.VisitQueryModel(queryModel);
var query = visitor.GetQuery();
_logger.LogDebug("Generated query: {query}", query);
scalarResultBehavior = visitor.ScalarResultBehavior;
return query;
}
}
}