{"results":{"result":{"added-files":{"code-health":9.232482912336911,"old-code-health":0.0,"files":[{"file":"src/Paramore.Brighter.SourceGenerators/BrighterRegistrationsGenerator.cs","loc":131,"code-health":10.0},{"file":"src/Paramore.Brighter.SourceGenerators/MarkerSymbols.cs","loc":34,"code-health":10.0},{"file":"src/Paramore.Brighter.SourceGenerators/Model/EquatableArray.cs","loc":42,"code-health":10.0},{"file":"src/Paramore.Brighter.SourceGenerators/Model/RegistrationModel.cs","loc":79,"code-health":9.6882083290695},{"file":"src/Paramore.Brighter.SourceGenerators/RegistrationWriter.cs","loc":147,"code-health":9.485374140625545},{"file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","loc":224,"code-health":7.96010775753639},{"file":"src/Paramore.Brighter.SourceGenerators/Model/PipelineModels.cs","loc":51,"code-health":10.0},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/BrighterBuilderExtensions.cs","loc":18,"code-health":10.0}]},"external-review-url":"https://github.com/BrighterCommand/Brighter/pull/4138","old-code-health":8.816158827775617,"modified-files":{"code-health":8.816158827775617,"old-code-health":8.816158827775617,"files":[{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionBrighterBuilder.cs","loc":186,"old-loc":179,"code-health":8.816158827775617,"old-code-health":8.816158827775617}]},"removed-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-id":"4138","analysis-time":"2026-05-18T23:21:07Z","negative-impact-count":10,"suppressions":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"affected-hotspots":1,"commits":["57fdcd270cc7512accf7cd2efbac7737f45beb9d","30a39ffe3182079ee56af1555b35a25cd8033b8a","5b45c44cfbbf7b6fb88c80b32cd20c330773b605","41f49d45605bb6d382b41361a264c934d01dc245","587a9243cdc442a98697b8d67eeaf85b19b10b9d","fe251bb3f82c2967a4c53a7a8daad4fe8f9b8ab8","af0f07006fe214d05a87b6490a2fee91a96021dd","1979663233e9a9d1424ccda9ac3a28df043bf72b"],"is-negative-review":true,"negative-findings":{"number-of-types":5,"number-of-files-touched":3,"findings":[{"method":"RegistrationModel","why-it-occurs":"Functions with many arguments indicate either a) low cohesion where the function has too many responsibilities, or b) a missing abstraction that encapsulates those arguments.\n\nThe threshold for the C# language is 4 function arguments.","name":"Excess Number of Function Arguments","file":"src/Paramore.Brighter.SourceGenerators/Model/RegistrationModel.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"20","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"7.534418959981518"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"JSON and YAML outputs previously described different documents because Neuroglia\n4.20.0 applies different default naming conventions in its JSON and YAML serialisers,\nand embedded JSON Schema fragments emitted PascalCase property names that did not\nmatch Brighter's camelCase wire format.\n\nLibrary changes:\n\n- Derive YAML from the JSON tree we just wrote. Guarantees both formats describe the\n  same document, regardless of upstream serialiser config drift.\n- Strip Neuroglia internals (isReference) and empty collections from both outputs.\n- Switch NJsonSchema generator to System.Text.Json reflection with camelCase property\n  policy and JsonStringEnumConverter, mirroring JsonSerialisationOptions.Options. The\n  schema now describes what JsonMessageMapper actually puts on the wire.\n- Declare draft-04 in SchemaFormat to match what NJsonSchema 11.x emits\n  (JsonSchema.ToJson hardcodes the dialect).\n- Refactor ProcessSourceAsync to take a MessageSource record, resolving the CodeScene\n  excess-arguments check.\n- Backfill missing XML docs on AsyncApiOptions and fix a stale paramref on\n  IAmASchemaGenerator.\n\nSample fix:\n\n- Kafka sample no longer requires a live broker for --generate-asyncapi.\n  KafkaProducerRegistryFactory.Create connects eagerly, so build it only on the\n  runtime path; assembly scanning still surfaces publications in the doc.\n\nTests: 46/46 pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:01:42Z","current-rev":"5e03da283","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"70b03a0ee","commit-title":"fix(#3828): align AsyncAPI output with wire format, eliminate JSON/YAML divergence","language":"C#","id":"f49c3320ae617404ccf0324e7fe2dad4565432c7","model-score":0.28,"author-id":null,"project-id":32198,"delta-file-score":0.22637527,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 2a306a512..bd7ae8957 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -51,2 +51,10 @@ private sealed record GenerationContext(\n \n+        // Inputs that vary per call into ProcessSourceAsync. Grouping them keeps the method\n+        // signature small (and satisfies the CodeScene \"Excess Number of Function Arguments\"\n+        // check) without flattening the call sites with positional parameters.\n+        private sealed record MessageSource(\n+            string Address,\n+            V3OperationAction Action,\n+            Type? RequestType);\n+\n         private readonly AsyncApiOptions _options;\n@@ -116,3 +124,3 @@ private async Task AddSubscriptionsAsync(\n                 await ProcessSourceAsync(\n-                    subscription.RoutingKey.Value, V3OperationAction.Receive, subscription.RequestType,\n+                    new MessageSource(subscription.RoutingKey.Value, V3OperationAction.Receive, subscription.RequestType),\n                     context, ct).ConfigureAwait(false);\n@@ -133,3 +141,3 @@ private async Task AddPublicationsAsync(\n                 await ProcessSourceAsync(\n-                    publication.Topic.Value, V3OperationAction.Send, publication.RequestType,\n+                    new MessageSource(publication.Topic.Value, V3OperationAction.Send, publication.RequestType),\n                     context, ct).ConfigureAwait(false);\n@@ -139,5 +147,3 @@ await ProcessSourceAsync(\n         private async Task ProcessSourceAsync(\n-            string address,\n-            V3OperationAction action,\n-            Type? requestType,\n+            MessageSource source,\n             GenerationContext context,\n@@ -145,11 +151,11 @@ private async Task ProcessSourceAsync(\n         {\n-            var channelId = SanitizeChannelId(address);\n+            var channelId = SanitizeChannelId(source.Address);\n \n-            EnsureChannel(context.Channels, channelId, address);\n+            EnsureChannel(context.Channels, channelId, source.Address);\n \n             string messageName;\n-            if (requestType != null)\n+            if (source.RequestType != null)\n             {\n-                messageName = requestType.Name;\n-                await EnsureMessageAsync(context.Messages, messageName, requestType, ct).ConfigureAwait(false);\n+                messageName = source.RequestType.Name;\n+                await EnsureMessageAsync(context.Messages, messageName, source.RequestType, ct).ConfigureAwait(false);\n             }\n@@ -163,3 +169,3 @@ private async Task ProcessSourceAsync(\n \n-            var actionString = action == V3OperationAction.Send ? \"send\" : \"receive\";\n+            var actionString = source.Action == V3OperationAction.Send ? \"send\" : \"receive\";\n             context.CoveredChannelActions.Add((channelId, actionString));\n@@ -169,3 +175,3 @@ private async Task ProcessSourceAsync(\n             {\n-                Action = action,\n+                Action = source.Action,\n                 Channel = new V3ReferenceDefinition { Reference = $\"#/channels/{channelId}\" },\n@@ -290,3 +296,3 @@ private static void EnsurePlaceholderMessage(Dictionary<string, V3MessageDefinit\n                     {\n-                        SchemaFormat = \"application/schema+json;version=draft-07\",\n+                        SchemaFormat = \"application/schema+json;version=draft-04\",\n                         Schema = emptyDoc.RootElement.Clone()\n@@ -332,3 +338,3 @@ private static V3SchemaDefinition EmptyObjectSchema()\n             {\n-                SchemaFormat = \"application/schema+json;version=draft-07\",\n+                SchemaFormat = \"application/schema+json;version=draft-04\",\n                 Schema = doc.RootElement.Clone()\n","improvement-type":"Excess Number of Function Arguments"}],"change-level":"warning","is-hotspot?":false,"line":33,"what-changed":"RegistrationModel has 17 arguments, max arguments = 4","how-to-fix":"Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring [INTRODUCE PARAMETER OBJECT](https://refactoring.com/catalog/introduceParameterObject.html) to encapsulate arguments that refer to the same logical concept.","change-type":"introduced"},{"method":"WriteHandlers","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.SourceGenerators/RegistrationWriter.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":99,"what-changed":"WriteHandlers has a cyclomatic complexity of 13, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"WriteHandlers","why-it-occurs":"A Bumpy Road is a function that contains multiple chunks of nested conditional logic inside the same function. The deeper the nesting and the more bumps, the lower the code health.\n\nA bumpy code road represents a lack of encapsulation which becomes an obstacle to comprehension. In imperative languages there’s also an increased risk for feature entanglement, which leads to complex state management. CodeScene considers the following rules for the code health impact: 1) The deeper the nested conditional logic of each bump, the higher the tax on our working memory. 2) The more bumps inside a function, the more expensive it is to refactor as each bump represents a missing abstraction. 3) The larger each bump – that is, the more lines of code it spans – the harder it is to build up a mental model of the function. The nesting depth for what is considered a bump is  levels of conditionals.","name":"Bumpy Road Ahead","file":"src/Paramore.Brighter.SourceGenerators/RegistrationWriter.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":99,"what-changed":"WriteHandlers has 3 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is 2 blocks per function","how-to-fix":"Bumpy Road implementations indicate a lack of encapsulation. Check out the detailed description of the [Bumpy Road code health issue](https://codescene.com/blog/bumpy-road-code-complexity-in-context/).\n\nA Bumpy Road often suggests that the function/method does too many things. The first refactoring step is to identify the different possible responsibilities of the function. Consider extracting those responsibilities into smaller, cohesive, and well-named functions. The [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html) refactoring is the primary response.","change-type":"introduced"},{"method":"TryClassifyInterface","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":176,"what-changed":"TryClassifyInterface has a cyclomatic complexity of 12, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"ReadClass","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":70,"what-changed":"ReadClass has a cyclomatic complexity of 10, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"AccessibilityModifier","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":298,"what-changed":"AccessibilityModifier has a cyclomatic complexity of 9, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"IsClassifiable","why-it-occurs":"A complex conditional is an expression inside a branch such as an <code>if</code>-statmeent which consists of multiple, logical operations. Example: <code>if (x.started() && y.running())</code>.Complex conditionals make the code even harder to read, and contribute to the Complex Method code smell. Encapsulate them.","name":"Complex Conditional","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":[{"diff":"diff --git a/complex_conditional.js b/complex_conditional.js\nindex c43da09584..94259ce874 100644\n--- a/complex_conditional.js\n+++ b/complex_conditional.js\n@@ -1,16 +1,34 @@\n function messageReceived(message, timeReceived) {\n-   // Ignore all messages which aren't from known customers:\n-   if (!message.sender &&\n-       customers.getId(message.name) == null) {\n+   // Refactoring #1: encapsulate the business rule in a\n+   // function. A clear name replaces the need for the comment:\n+   if (!knownCustomer(message)) {\n      log('spam received -- ignoring');\n      return;\n    }\n \n-  // Provide an auto-reply when outside business hours:\n-  if ((timeReceived.getHours() > 17) ||\n-      (timeReceived.getHours() < 8)) {\n+  // Refactoring #2: encapsulate the business rule.\n+  // Again, note how a clear function name replaces the\n+  // need for a code comment:\n+  if (outsideBusinessHours(timeReceived)) {\n     return autoReplyTo(message);\n   }\n \n   pingAgentFor(message);\n+}\n+\n+function outsideBusinessHours(timeReceived) {\n+  // Refactoring #3: replace magic numbers with\n+  // symbols that communicate with the code reader:\n+  const closingHour = 17;\n+  const openingHour = 8;\n+\n+  const hours = timeReceived.getHours();\n+\n+  // Refactoring #4: simple conditional rules can\n+  // be further clarified by introducing a variable:\n+  const afterClosing = hours > closingHour;\n+  const beforeOpening = hours < openingHour;\n+\n+  // Yeah -- look how clear the business rule is now!\n+  return afterClosing || beforeOpening;\n }\n\\ No newline at end of file\n","language":"c#","improvement-type":"Complex Conditional"}],"change-level":"warning","is-hotspot?":false,"line":227,"what-changed":"IsClassifiable has 1 complex conditionals with 2 branches, threshold = 2","how-to-fix":"Apply the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring so that the complex conditional is encapsulated in a separate function with a good name that captures the business rule. Optionally, for simple expressions, introduce a new variable which holds the result of the complex conditional.","change-type":"introduced"},{"method":"TryClassifyInterface","why-it-occurs":"A Bumpy Road is a function that contains multiple chunks of nested conditional logic inside the same function. The deeper the nesting and the more bumps, the lower the code health.\n\nA bumpy code road represents a lack of encapsulation which becomes an obstacle to comprehension. In imperative languages there’s also an increased risk for feature entanglement, which leads to complex state management. CodeScene considers the following rules for the code health impact: 1) The deeper the nested conditional logic of each bump, the higher the tax on our working memory. 2) The more bumps inside a function, the more expensive it is to refactor as each bump represents a missing abstraction. 3) The larger each bump – that is, the more lines of code it spans – the harder it is to build up a mental model of the function. The nesting depth for what is considered a bump is  levels of conditionals.","name":"Bumpy Road Ahead","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":176,"what-changed":"TryClassifyInterface has 3 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is 2 blocks per function","how-to-fix":"Bumpy Road implementations indicate a lack of encapsulation. Check out the detailed description of the [Bumpy Road code health issue](https://codescene.com/blog/bumpy-road-code-complexity-in-context/).\n\nA Bumpy Road often suggests that the function/method does too many things. The first refactoring step is to identify the different possible responsibilities of the function. Consider extracting those responsibilities into smaller, cohesive, and well-named functions. The [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html) refactoring is the primary response.","change-type":"introduced"},{"why-it-occurs":"Overall Code Complexity is measured by the mean cyclomatic complexity across all functions in the file. The lower the number, the better.\n\nCyclomatic complexity is a function level metric that measures the number of logical branches (if-else, loops, etc.). Cyclomatic complexity is a rough complexity measure, but useful as a way of estimating the minimum number of unit tests you would need. As such, prefer functions with low cyclomatic complexity (2-3 branches).","name":"Overall Code Complexity","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"what-changed":"This module has a mean cyclomatic complexity of 4.22 across 18 functions. The mean complexity threshold is 4","how-to-fix":"You address the overall cyclomatic complexity by a) modularizing the code, and b) abstract away the complexity. Let's look at some examples:\n\nModularizing the Code: Do an X-Ray and inspect the local hotspots. Are there any complex conditional expressions? If yes, then do a [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring. Extract the conditional logic into a separate function and put a good name on that function. This clarifies the intent and makes the original function easier to read. Repeat until all complex conditional expressions have been simplified.\n\n","change-type":"introduced"},{"method":"TryClassifyInterface","why-it-occurs":"Functions with many arguments indicate either a) low cohesion where the function has too many responsibilities, or b) a missing abstraction that encapsulates those arguments.\n\nThe threshold for the C# language is 4 function arguments.","name":"Excess Number of Function Arguments","file":"src/Paramore.Brighter.SourceGenerators/SemanticModelReader.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"20","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"7.534418959981518"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"JSON and YAML outputs previously described different documents because Neuroglia\n4.20.0 applies different default naming conventions in its JSON and YAML serialisers,\nand embedded JSON Schema fragments emitted PascalCase property names that did not\nmatch Brighter's camelCase wire format.\n\nLibrary changes:\n\n- Derive YAML from the JSON tree we just wrote. Guarantees both formats describe the\n  same document, regardless of upstream serialiser config drift.\n- Strip Neuroglia internals (isReference) and empty collections from both outputs.\n- Switch NJsonSchema generator to System.Text.Json reflection with camelCase property\n  policy and JsonStringEnumConverter, mirroring JsonSerialisationOptions.Options. The\n  schema now describes what JsonMessageMapper actually puts on the wire.\n- Declare draft-04 in SchemaFormat to match what NJsonSchema 11.x emits\n  (JsonSchema.ToJson hardcodes the dialect).\n- Refactor ProcessSourceAsync to take a MessageSource record, resolving the CodeScene\n  excess-arguments check.\n- Backfill missing XML docs on AsyncApiOptions and fix a stale paramref on\n  IAmASchemaGenerator.\n\nSample fix:\n\n- Kafka sample no longer requires a live broker for --generate-asyncapi.\n  KafkaProducerRegistryFactory.Create connects eagerly, so build it only on the\n  runtime path; assembly scanning still surfaces publications in the doc.\n\nTests: 46/46 pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:01:42Z","current-rev":"5e03da283","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"70b03a0ee","commit-title":"fix(#3828): align AsyncAPI output with wire format, eliminate JSON/YAML divergence","language":"C#","id":"f49c3320ae617404ccf0324e7fe2dad4565432c7","model-score":0.28,"author-id":null,"project-id":32198,"delta-file-score":0.22637527,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 2a306a512..bd7ae8957 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -51,2 +51,10 @@ private sealed record GenerationContext(\n \n+        // Inputs that vary per call into ProcessSourceAsync. Grouping them keeps the method\n+        // signature small (and satisfies the CodeScene \"Excess Number of Function Arguments\"\n+        // check) without flattening the call sites with positional parameters.\n+        private sealed record MessageSource(\n+            string Address,\n+            V3OperationAction Action,\n+            Type? RequestType);\n+\n         private readonly AsyncApiOptions _options;\n@@ -116,3 +124,3 @@ private async Task AddSubscriptionsAsync(\n                 await ProcessSourceAsync(\n-                    subscription.RoutingKey.Value, V3OperationAction.Receive, subscription.RequestType,\n+                    new MessageSource(subscription.RoutingKey.Value, V3OperationAction.Receive, subscription.RequestType),\n                     context, ct).ConfigureAwait(false);\n@@ -133,3 +141,3 @@ private async Task AddPublicationsAsync(\n                 await ProcessSourceAsync(\n-                    publication.Topic.Value, V3OperationAction.Send, publication.RequestType,\n+                    new MessageSource(publication.Topic.Value, V3OperationAction.Send, publication.RequestType),\n                     context, ct).ConfigureAwait(false);\n@@ -139,5 +147,3 @@ await ProcessSourceAsync(\n         private async Task ProcessSourceAsync(\n-            string address,\n-            V3OperationAction action,\n-            Type? requestType,\n+            MessageSource source,\n             GenerationContext context,\n@@ -145,11 +151,11 @@ private async Task ProcessSourceAsync(\n         {\n-            var channelId = SanitizeChannelId(address);\n+            var channelId = SanitizeChannelId(source.Address);\n \n-            EnsureChannel(context.Channels, channelId, address);\n+            EnsureChannel(context.Channels, channelId, source.Address);\n \n             string messageName;\n-            if (requestType != null)\n+            if (source.RequestType != null)\n             {\n-                messageName = requestType.Name;\n-                await EnsureMessageAsync(context.Messages, messageName, requestType, ct).ConfigureAwait(false);\n+                messageName = source.RequestType.Name;\n+                await EnsureMessageAsync(context.Messages, messageName, source.RequestType, ct).ConfigureAwait(false);\n             }\n@@ -163,3 +169,3 @@ private async Task ProcessSourceAsync(\n \n-            var actionString = action == V3OperationAction.Send ? \"send\" : \"receive\";\n+            var actionString = source.Action == V3OperationAction.Send ? \"send\" : \"receive\";\n             context.CoveredChannelActions.Add((channelId, actionString));\n@@ -169,3 +175,3 @@ private async Task ProcessSourceAsync(\n             {\n-                Action = action,\n+                Action = source.Action,\n                 Channel = new V3ReferenceDefinition { Reference = $\"#/channels/{channelId}\" },\n@@ -290,3 +296,3 @@ private static void EnsurePlaceholderMessage(Dictionary<string, V3MessageDefinit\n                     {\n-                        SchemaFormat = \"application/schema+json;version=draft-07\",\n+                        SchemaFormat = \"application/schema+json;version=draft-04\",\n                         Schema = emptyDoc.RootElement.Clone()\n@@ -332,3 +338,3 @@ private static V3SchemaDefinition EmptyObjectSchema()\n             {\n-                SchemaFormat = \"application/schema+json;version=draft-07\",\n+                SchemaFormat = \"application/schema+json;version=draft-04\",\n                 Schema = doc.RootElement.Clone()\n","improvement-type":"Excess Number of Function Arguments"}],"change-level":"warning","is-hotspot?":false,"line":176,"what-changed":"TryClassifyInterface has 5 arguments, max arguments = 4","how-to-fix":"Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring [INTRODUCE PARAMETER OBJECT](https://refactoring.com/catalog/introduceParameterObject.html) to encapsulate arguments that refer to the same logical concept.","change-type":"introduced"}]},"positive-impact-count":0,"repo":"Brighter","code-health":9.147574710880331,"version":"3.0","authors":["Stuart Lang"],"directives":{"added":[],"removed":[]},"positive-findings":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"notices":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"external-review-provider":"GitHub"},"analysistime":"2026-05-18T23:21:06.000Z","project-name":"Brighter","repository":"https://github.com/BrighterCommand/Brighter.git"}}