{"results":{"result":{"added-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-url":"https://github.com/BrighterCommand/Brighter/pull/4070","old-code-health":8.695162932443232,"modified-files":{"code-health":8.67720293119869,"old-code-health":8.695162932443232,"files":[{"file":"src/Paramore.Brighter/Message.cs","loc":121,"old-loc":120,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/OutboxProducerMediator.cs","loc":869,"old-loc":855,"code-health":6.966247769888114,"old-code-health":6.966247769888114},{"file":"src/Paramore.Brighter/WrapPipeline.cs","loc":66,"old-loc":62,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/WrapPipelineAsync.cs","loc":71,"old-loc":67,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusMessagePublisher.cs","loc":73,"old-loc":71,"code-health":9.6882083290695,"old-code-health":9.6882083290695},{"file":"src/Paramore.Brighter/MessageHeader.cs","loc":218,"old-loc":188,"code-health":8.43184786202649,"old-code-health":8.43184786202649},{"file":"src/Paramore.Brighter.MessageScheduler.Azure/AzureServiceBusScheduler.cs","loc":203,"old-loc":201,"code-health":8.816158827775617,"old-code-health":8.816158827775617},{"file":"src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SnsMessagePublisher.cs","loc":102,"old-loc":102,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.AWSSQS.V4/SqsMessageSender.cs","loc":130,"old-loc":130,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs","loc":102,"old-loc":102,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","loc":132,"old-loc":132,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.GcpPubSub/Parser.cs","loc":303,"old-loc":303,"code-health":9.096655465156704,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter.MessagingGateway.Kafka/KafkaDefaultMessageHeaderBuilder.cs","loc":112,"old-loc":111,"code-health":9.663736664415227,"old-code-health":9.663736664415227},{"file":"src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagePublisher.cs","loc":204,"old-loc":204,"code-health":9.096655465156704,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessagePublisher.cs","loc":174,"old-loc":174,"code-health":9.096655465156704,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter.MessagingGateway.Redis/RedisMessagePublisher.cs","loc":149,"old-loc":149,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs","loc":151,"old-loc":149,"code-health":7.931210907354582,"old-code-health":8.189068796234432}]},"removed-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-id":"4070","analysis-time":"2026-04-27T07:55:50Z","negative-impact-count":2,"suppressions":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"affected-hotspots":1,"commits":["ee305221877381bdf7dee099a9949c537e40dc14","7fe500feda77ef10480295fa38095ea9534ffbfb","1f6457243b762514db7bd4ef81091b590416ea47","839c4606c8d21f3fa51fe4b0b2fd49913ea6bd1f","7c47c08c977118abac648727a8fd1b762bbb1a66","ae70780bf1947908086e0d71fe48653bdb138765","8176c8839a4ed378c9756d89535980312343fefa","06a7a6ba265b36a1fd1a7d052b40aeb8c48e67b0","20c1c3610626b4110aa5f69d7853cafcd45a6a5f","da71167466f16f5ed175f5957e872f6beb63f1dc","9bfd39871f13cfa8f5d5158b0df883335afb700a","66fc585e06ce800eb454e4c8bb7c344b7837d11c","ed04c52050b594a7fbfa95e515a2b6fa0a12d09a","736612c3e7b38ed5c8a7040a5d33386a30beb180","f16577f8e8e8a120f4b959d657ee8cd56dbaf30e","b4406b9f7f6629f48845660e9811f1432756454a"],"is-negative-review":true,"negative-findings":{"number-of-types":2,"number-of-files-touched":1,"findings":[{"method":"SendWithDelayAsync","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":58,"what-changed":"SendWithDelayAsync increases in cyclomatic complexity from 28 to 29, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"degraded"},{"method":"SendWithDelayAsync","why-it-occurs":"A complex conditional is an expression inside a branch such as an <code>if</code>-statmeent which consists of multiple, logical operations. Example: <code>if (x.started() && y.running())</code>.Complex conditionals make the code even harder to read, and contribute to the Complex Method code smell. Encapsulate them.","name":"Complex Conditional","file":"src/Paramore.Brighter.MessagingGateway.RocketMQ/RocketMqMessageProducer.cs","refactoring-examples":[{"diff":"diff --git a/complex_conditional.js b/complex_conditional.js\nindex c43da09584..94259ce874 100644\n--- a/complex_conditional.js\n+++ b/complex_conditional.js\n@@ -1,16 +1,34 @@\n function messageReceived(message, timeReceived) {\n-   // Ignore all messages which aren't from known customers:\n-   if (!message.sender &&\n-       customers.getId(message.name) == null) {\n+   // Refactoring #1: encapsulate the business rule in a\n+   // function. A clear name replaces the need for the comment:\n+   if (!knownCustomer(message)) {\n      log('spam received -- ignoring');\n      return;\n    }\n \n-  // Provide an auto-reply when outside business hours:\n-  if ((timeReceived.getHours() > 17) ||\n-      (timeReceived.getHours() < 8)) {\n+  // Refactoring #2: encapsulate the business rule.\n+  // Again, note how a clear function name replaces the\n+  // need for a code comment:\n+  if (outsideBusinessHours(timeReceived)) {\n     return autoReplyTo(message);\n   }\n \n   pingAgentFor(message);\n+}\n+\n+function outsideBusinessHours(timeReceived) {\n+  // Refactoring #3: replace magic numbers with\n+  // symbols that communicate with the code reader:\n+  const closingHour = 17;\n+  const openingHour = 8;\n+\n+  const hours = timeReceived.getHours();\n+\n+  // Refactoring #4: simple conditional rules can\n+  // be further clarified by introducing a variable:\n+  const afterClosing = hours > closingHour;\n+  const beforeOpening = hours < openingHour;\n+\n+  // Yeah -- look how clear the business rule is now!\n+  return afterClosing || beforeOpening;\n }\n\\ No newline at end of file\n","language":"c#","improvement-type":"Complex Conditional"}],"change-level":"warning","is-hotspot?":false,"line":60,"what-changed":"SendWithDelayAsync increases from 2 complex conditionals with 4 branches to 3 complex conditionals with 6 branches, threshold = 2","how-to-fix":"Apply the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring so that the complex conditional is encapsulated in a separate function with a good name that captures the business rule. Optionally, for simple expressions, introduce a new variable which holds the result of the complex conditional.","change-type":"degraded"}]},"positive-impact-count":1,"repo":"Brighter","code-health":8.67720293119869,"version":"3.0","authors":["Jonny Olliff-Lee","DevJonny"],"directives":{"added":[],"removed":[]},"positive-findings":{"number-of-types":1,"number-of-files-touched":1,"findings":[{"name":"Overall Code Complexity","file":"src/Paramore.Brighter/MessageHeader.cs","change-type":"improved","change-level":"improvement","is-hotspot?":false,"why-it-occurs":"Overall Code Complexity is measured by the mean cyclomatic complexity across all functions in the file. The lower the number, the better.\n\nCyclomatic complexity is a function level metric that measures the number of logical branches (if-else, loops, etc.). Cyclomatic complexity is a rough complexity measure, but useful as a way of estimating the minimum number of unit tests you would need. As such, prefer functions with low cyclomatic complexity (2-3 branches).","how-to-fix":"You address the overall cyclomatic complexity by a) modularizing the code, and b) abstract away the complexity. Let's look at some examples:\n\nModularizing the Code: Do an X-Ray and inspect the local hotspots. Are there any complex conditional expressions? If yes, then do a [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring. Extract the conditional logic into a separate function and put a good name on that function. This clarifies the intent and makes the original function easier to read. Repeat until all complex conditional expressions have been simplified.\n\n","what-changed":"The mean cyclomatic complexity decreases from 4.88 to 4.27, threshold = 4"}]},"notices":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"external-review-provider":"GitHub"},"analysistime":"2026-04-27T07:55:50.000Z","project-name":"Brighter","repository":"https://github.com/BrighterCommand/Brighter.git"}}