📄
SearchService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Logging; namespace MSearch.Domain; public sealed partial class SearchService(ILogger<SearchService> logger, IEnumerable<ISearchProvider> searchProviders) { private readonly IReadOnlyCollection<ISearchProvider> searchProviders = [.. searchProviders]; public IAsyncEnumerable<SearchResult> Search(string term, CancellationToken cancellationToken) { var query = new SearchQuery(term); var channel = Channel.CreateUnbounded<SearchResult>(new() { SingleReader = true }); StartProviders(query, channel.Writer, cancellationToken); return channel.Reader.ReadAllAsync(cancellationToken); } private async void StartProviders( SearchQuery query, ChannelWriter<SearchResult> writer, CancellationToken cancellationToken ) { try { var searchTasks = searchProviders.Select(sp => StartProvider(query, sp, writer, cancellationToken)); await Task.WhenAll(searchTasks); } finally { var completed = writer.TryComplete(); if (!completed) { LogUncompletedChannel(); } } } private async Task StartProvider( SearchQuery query, ISearchProvider provider, ChannelWriter<SearchResult> writer, CancellationToken cancellationToken ) { try { await foreach (var result in provider.Search(query, cancellationToken)) { await writer.WriteAsync(result, cancellationToken); } } catch (Exception ex) { LogSearchProviderException(ex); } } [LoggerMessage(LogLevel.Error, "Failed when processing search provider results.")] private partial void LogSearchProviderException(Exception exception); [LoggerMessage(LogLevel.Warning, "Could not close channel after writing all search results. Leaving channel open.")] private partial void LogUncompletedChannel(); }