{"results":{"result":{"added-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-url":"https://github.com/BrighterCommand/Brighter/pull/4155","old-code-health":9.144789969077635,"modified-files":{"code-health":9.017914425049959,"old-code-health":9.144789969077635,"files":[{"file":"src/Paramore.Brighter.BoxProvisioning.MsSql/MsSqlBoxDetectionHelper.cs","loc":134,"old-loc":130,"code-health":8.283981080161325,"old-code-health":8.545379580978913},{"file":"src/Paramore.Brighter.BoxProvisioning.MsSql/MsSqlBoxMigrationRunner.cs","loc":278,"old-loc":181,"code-health":8.816158827775617,"old-code-health":8.816158827775617},{"file":"src/Paramore.Brighter.BoxProvisioning.MsSql/MsSqlBoxProvisioningExtensions.cs","loc":130,"old-loc":130,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter.BoxProvisioning.MySql/MySqlBoxDetectionHelper.cs","loc":117,"old-loc":115,"code-health":8.816158827775617,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter.BoxProvisioning.MySql/MySqlBoxMigrationRunner.cs","loc":177,"old-loc":173,"code-health":8.816158827775617,"old-code-health":8.816158827775617},{"file":"src/Paramore.Brighter.BoxProvisioning.MySql/MySqlBoxProvisioningExtensions.cs","loc":132,"old-loc":132,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxDetectionHelper.cs","loc":164,"old-loc":154,"code-health":8.031638343271364,"old-code-health":8.545379580978913},{"file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxMigrationRunner.cs","loc":286,"old-loc":189,"code-health":8.545379580978913,"old-code-health":8.816158827775617},{"file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxProvisioningExtensions.cs","loc":130,"old-loc":130,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter.BoxProvisioning.Spanner/SpannerBoxDetectionHelper.cs","loc":83,"old-loc":81,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter.BoxProvisioning.Spanner/SpannerInboxProvisioner.cs","loc":82,"old-loc":82,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.BoxProvisioning.Spanner/SpannerOutboxProvisioner.cs","loc":86,"old-loc":86,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.BoxProvisioning.Sqlite/SqliteBoxDetectionHelper.cs","loc":110,"old-loc":108,"code-health":8.816158827775617,"old-code-health":8.545379580978913},{"file":"src/Paramore.Brighter.BoxProvisioning.Sqlite/SqliteBoxMigrationRunner.cs","loc":214,"old-loc":210,"code-health":8.816158827775617,"old-code-health":8.816158827775617},{"file":"src/Paramore.Brighter.BoxProvisioning.Sqlite/SqliteBoxProvisioningExtensions.cs","loc":136,"old-loc":136,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter.BoxProvisioning/BoxProvisioningOptions.cs","loc":14,"old-loc":13,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.BoxProvisioning/SqlBoxMigrationRunner.cs","loc":213,"old-loc":195,"code-health":8.579508856062546,"old-code-health":8.891477397051005},{"file":"src/Paramore.Brighter.BoxProvisioning/SqlBoxProvisioner.cs","loc":121,"old-loc":95,"code-health":9.6882083290695,"old-code-health":9.6882083290695}]},"removed-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-id":"4155","analysis-time":"2026-05-31T21:48:34Z","negative-impact-count":8,"suppressions":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"affected-hotspots":6,"commits":["29eb7418d4920339ee2abf3dc1e6870115c29c0b","54ad3894de4d88c4eccf2f3355aa64e6a69c1244","afc5cae3ffaf87a2f0d3aa5c902aa75bf81d3608","028c1614f6531e76b6857d260e81460647b05661","a0d6242410e3b86e42f6b8cfd5b22c645cc0afab","7938c56abdcd82fddb0a864c72e664ef48f54d70","c1446e77c35dfe66b001d52d8f47f678e188f6d2","d2b3936474dbdf376c36b4608c307e8929c77092","7d673800d52cc75f0a0bc3f0b5d97d0e19dc9610","d1d6263b34c2982421a0e9a2cd405a1c6bd87131","b7d22923afaca116a3e3db4f0b89539bafdbf956","8f8755a4d79d4dc6b2f29612a8d652684048c514","05a8bc9939676e87b039e5b394c4988478118947","11857403544d1803751e4d279253e7cd9184a52e","389daf1c452bdaadbdba481a1dce20bcce7db952","cfe69ce73bf879a69246cf5f9efcee27b1ff9a08","c4a40f872d3ed59d43937994cfa27308cd2161b1","4ffb7cbb926507527215ce125b4017d14e07c52c","a5bc62d2810601a7d9ee26d558a13addd931708b","981c7fa05cefbd347d1375d1310fb61d739886da","991b4440bfeb2c1cf859b904c3558b2c9ea3e0ab","2ae34c9c657f7260df153565f2c9d8e5e8f07a4b","c6335eb0056c6953eabee229d46295b6c5cbfada","2a0674b21f7ff8ed433285777a407ead2ef18dd0","ecb4f10d426cfef77a8db1e1b237fc2c83952120","287d6b387d507d45c5a7e9c060f33b91d55cc55d","626748f0924172b6e5e75af9b8677fe0057406c7","ba8b9eff26366c4b6f517d00133f7b3be7879596","375f6f809932507247e6454b758bdca75bd1fafa","8512de1c085f53c5c0dfa919a3e486883da17d08"],"is-negative-review":true,"negative-findings":{"number-of-types":3,"number-of-files-touched":6,"findings":[{"why-it-occurs":"String is a generic type that fail to capture the constraints of the domain object it represents. In this module, 40 % of all function arguments are string types.","name":"String Heavy Function Arguments","file":"src/Paramore.Brighter.BoxProvisioning.MsSql/MsSqlBoxDetectionHelper.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"what-changed":"In this module, 40.0% of all arguments to its 7 functions are strings. The threshold for string arguments is 39.0%","how-to-fix":"Heavy string usage indicates a missing domain language. Introduce data types that encapsulate the semantics. For example, a user_name is better represented as a constrained User type rather than a pure string, which could be anything.","change-type":"introduced"},{"why-it-occurs":"String is a generic type that fail to capture the constraints of the domain object it represents. In this module, 41 % of all function arguments are string types.","name":"String Heavy Function Arguments","file":"src/Paramore.Brighter.BoxProvisioning.MySql/MySqlBoxDetectionHelper.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"what-changed":"In this module, 41.2% of all arguments to its 7 functions are strings. The threshold for string arguments is 39.0%","how-to-fix":"Heavy string usage indicates a missing domain language. Introduce data types that encapsulate the semantics. For example, a user_name is better represented as a constrained User type rather than a pure string, which could be anything.","change-type":"introduced"},{"method":"DoesHistoryExistAsync","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxDetectionHelper.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":126,"what-changed":"DoesHistoryExistAsync has a cyclomatic complexity of 9, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"why-it-occurs":"String is a generic type that fail to capture the constraints of the domain object it represents. In this module, 41 % of all function arguments are string types.","name":"String Heavy Function Arguments","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxDetectionHelper.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"what-changed":"In this module, 40.5% of all arguments to its 9 functions are strings. The threshold for string arguments is 39.0%","how-to-fix":"Heavy string usage indicates a missing domain language. Introduce data types that encapsulate the semantics. For example, a user_name is better represented as a constrained User type rather than a pure string, which could be anything.","change-type":"introduced"},{"method":"EnsureHistoryTableAsync","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxMigrationRunner.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":true,"line":143,"what-changed":"EnsureHistoryTableAsync has a cyclomatic complexity of 9, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"why-it-occurs":"String is a generic type that fail to capture the constraints of the domain object it represents. In this module, 40 % of all function arguments are string types.","name":"String Heavy Function Arguments","file":"src/Paramore.Brighter.BoxProvisioning.Sqlite/SqliteBoxDetectionHelper.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"what-changed":"In this module, 40.0% of all arguments to its 7 functions are strings. The threshold for string arguments is 39.0%","how-to-fix":"Heavy string usage indicates a missing domain language. Introduce data types that encapsulate the semantics. For example, a user_name is better represented as a constrained User type rather than a pure string, which could be anything.","change-type":"introduced"},{"method":"MigrateAsync","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.BoxProvisioning/SqlBoxMigrationRunner.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":179,"what-changed":"MigrateAsync increases in cyclomatic complexity from 10 to 14, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"degraded"},{"method":"MigrateAsync","why-it-occurs":"A complex conditional is an expression inside a branch such as an <code>if</code>-statmeent which consists of multiple, logical operations. Example: <code>if (x.started() && y.running())</code>.Complex conditionals make the code even harder to read, and contribute to the Complex Method code smell. Encapsulate them.","name":"Complex Conditional","file":"src/Paramore.Brighter.BoxProvisioning/SqlBoxMigrationRunner.cs","refactoring-examples":[{"diff":"diff --git a/complex_conditional.js b/complex_conditional.js\nindex c43da09584..94259ce874 100644\n--- a/complex_conditional.js\n+++ b/complex_conditional.js\n@@ -1,16 +1,34 @@\n function messageReceived(message, timeReceived) {\n-   // Ignore all messages which aren't from known customers:\n-   if (!message.sender &&\n-       customers.getId(message.name) == null) {\n+   // Refactoring #1: encapsulate the business rule in a\n+   // function. A clear name replaces the need for the comment:\n+   if (!knownCustomer(message)) {\n      log('spam received -- ignoring');\n      return;\n    }\n \n-  // Provide an auto-reply when outside business hours:\n-  if ((timeReceived.getHours() > 17) ||\n-      (timeReceived.getHours() < 8)) {\n+  // Refactoring #2: encapsulate the business rule.\n+  // Again, note how a clear function name replaces the\n+  // need for a code comment:\n+  if (outsideBusinessHours(timeReceived)) {\n     return autoReplyTo(message);\n   }\n \n   pingAgentFor(message);\n+}\n+\n+function outsideBusinessHours(timeReceived) {\n+  // Refactoring #3: replace magic numbers with\n+  // symbols that communicate with the code reader:\n+  const closingHour = 17;\n+  const openingHour = 8;\n+\n+  const hours = timeReceived.getHours();\n+\n+  // Refactoring #4: simple conditional rules can\n+  // be further clarified by introducing a variable:\n+  const afterClosing = hours > closingHour;\n+  const beforeOpening = hours < openingHour;\n+\n+  // Yeah -- look how clear the business rule is now!\n+  return afterClosing || beforeOpening;\n }\n\\ No newline at end of file\n","language":"c#","improvement-type":"Complex Conditional"}],"change-level":"warning","is-hotspot?":false,"line":201,"what-changed":"MigrateAsync has 1 complex conditionals with 2 branches, threshold = 2","how-to-fix":"Apply the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring so that the complex conditional is encapsulated in a separate function with a good name that captures the business rule. Optionally, for simple expressions, introduce a new variable which holds the result of the complex conditional.","change-type":"introduced"}]},"positive-impact-count":5,"repo":"Brighter","code-health":9.017914425049959,"version":"3.0","authors":["Ian Cooper"],"directives":{"added":[],"removed":[]},"positive-findings":{"number-of-types":3,"number-of-files-touched":4,"findings":[{"name":"Missing Arguments Abstractions","file":"src/Paramore.Brighter.BoxProvisioning.MsSql/MsSqlBoxMigrationRunner.cs","change-type":"improved","change-level":"improvement","is-hotspot?":true,"why-it-occurs":"This code health issue is measured as the average number of function arguments across the whole file. A function with many arguments can be simplified either by a) splitting the function if it has too many responsibilities, or b) by introducing an abstraction (class, record, struct, etc.) which encapsulates the arguments. ","how-to-fix":"Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring [INTRODUCE PARAMETER OBJECT](https://refactoring.com/catalog/introduceParameterObject.html) to encapsulate arguments that refer to the same logical concept.","what-changed":"The average number of function arguments decreases from 4.92 to 4.80, threshold = 4.00"},{"name":"Overall Code Complexity","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxDetectionHelper.cs","change-type":"improved","change-level":"improvement","is-hotspot?":false,"why-it-occurs":"Overall Code Complexity is measured by the mean cyclomatic complexity across all functions in the file. The lower the number, the better.\n\nCyclomatic complexity is a function level metric that measures the number of logical branches (if-else, loops, etc.). Cyclomatic complexity is a rough complexity measure, but useful as a way of estimating the minimum number of unit tests you would need. As such, prefer functions with low cyclomatic complexity (2-3 branches).","how-to-fix":"You address the overall cyclomatic complexity by a) modularizing the code, and b) abstract away the complexity. Let's look at some examples:\n\nModularizing the Code: Do an X-Ray and inspect the local hotspots. Are there any complex conditional expressions? If yes, then do a [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring. Extract the conditional logic into a separate function and put a good name on that function. This clarifies the intent and makes the original function easier to read. Repeat until all complex conditional expressions have been simplified.\n\n","what-changed":"The mean cyclomatic complexity decreases from 4.25 to 4.11, threshold = 4"},{"name":"Missing Arguments Abstractions","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxDetectionHelper.cs","change-type":"improved","change-level":"improvement","is-hotspot?":false,"why-it-occurs":"This code health issue is measured as the average number of function arguments across the whole file. A function with many arguments can be simplified either by a) splitting the function if it has too many responsibilities, or b) by introducing an abstraction (class, record, struct, etc.) which encapsulates the arguments. ","how-to-fix":"Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring [INTRODUCE PARAMETER OBJECT](https://refactoring.com/catalog/introduceParameterObject.html) to encapsulate arguments that refer to the same logical concept.","what-changed":"The average number of function arguments decreases from 4.25 to 4.11, threshold = 4.00"},{"name":"Missing Arguments Abstractions","file":"src/Paramore.Brighter.BoxProvisioning.PostgreSql/PostgreSqlBoxMigrationRunner.cs","change-type":"improved","change-level":"improvement","is-hotspot?":true,"why-it-occurs":"This code health issue is measured as the average number of function arguments across the whole file. A function with many arguments can be simplified either by a) splitting the function if it has too many responsibilities, or b) by introducing an abstraction (class, record, struct, etc.) which encapsulates the arguments. ","how-to-fix":"Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring [INTRODUCE PARAMETER OBJECT](https://refactoring.com/catalog/introduceParameterObject.html) to encapsulate arguments that refer to the same logical concept.","what-changed":"The average number of function arguments decreases from 4.92 to 4.87, threshold = 4.00"},{"name":"Code Duplication","file":"src/Paramore.Brighter.BoxProvisioning.Sqlite/SqliteBoxDetectionHelper.cs","change-type":"fixed","change-level":"improvement","is-hotspot?":false,"why-it-occurs":"Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.","how-to-fix":"A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. [Read More](https://codescene.com/blog/software-revolution-part3/)\n\nOnce you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.","what-changed":"The module no longer contains too many functions with similar structure"}]},"notices":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"external-review-provider":"GitHub"},"analysistime":"2026-05-31T21:48:33.000Z","project-name":"Brighter","repository":"https://github.com/BrighterCommand/Brighter.git"}}