{"results":{"result":{"added-files":{"code-health":9.824790552828576,"old-code-health":0.0,"files":[{"file":"src/Paramore.Brighter/HandlerMethodDiscovery.cs","loc":37,"code-health":10.0},{"file":"src/Paramore.Brighter/MapperMethodDiscovery.cs","loc":49,"code-health":10.0},{"file":"src/Paramore.Brighter/AndSpecification.cs","loc":29,"code-health":10.0},{"file":"src/Paramore.Brighter/NotSpecification.cs","loc":40,"code-health":10.0},{"file":"src/Paramore.Brighter/OrSpecification.cs","loc":29,"code-health":10.0},{"file":"src/Paramore.Brighter/Validation/PipelineValidationException.cs","loc":30,"code-health":10.0},{"file":"src/Paramore.Brighter/Validation/PipelineValidationResult.cs","loc":22,"code-health":10.0},{"file":"src/Paramore.Brighter/ValidationResult.cs","loc":15,"code-health":10.0},{"file":"src/Paramore.Brighter/ValidationResultCollector.cs","loc":16,"code-health":10.0},{"file":"src/Paramore.Brighter/Validation/HandlerPipelineValidationRules.cs","loc":63,"code-health":10.0},{"file":"src/Paramore.Brighter/Validation/ProducerValidationRules.cs","loc":20,"code-health":10.0},{"file":"src/Paramore.Brighter.ServiceActivator/Validation/ConsumerValidationRules.cs","loc":77,"code-health":9.473415538276296},{"file":"src/Paramore.Brighter/Validation/PipelineValidator.cs","loc":72,"code-health":10.0},{"file":"src/Paramore.Brighter/Validation/PipelineDiagnosticWriter.cs","loc":100,"code-health":9.096655465156704},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/BrighterPipelineValidationExtensions.cs","loc":68,"code-health":10.0},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/BrighterValidationHostedService.cs","loc":52,"code-health":10.0},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/BrighterDiagnosticHostedService.cs","loc":28,"code-health":10.0}]},"external-review-url":"https://github.com/BrighterCommand/Brighter/pull/4033","old-code-health":8.884336464764408,"modified-files":{"code-health":8.900786153740952,"old-code-health":8.884336464764408,"files":[{"file":"src/Paramore.Brighter/Specification.cs","loc":101,"old-loc":41,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/RequestHandler.cs","loc":71,"old-loc":76,"code-health":9.387218218812514,"old-code-health":9.387218218812514},{"file":"src/Paramore.Brighter/RequestHandlerAsync.cs","loc":76,"old-loc":83,"code-health":9.387218218812514,"old-code-health":9.387218218812514},{"file":"src/Paramore.Brighter/TransformPipelineBuilder.cs","loc":196,"old-loc":190,"code-health":9.387218218812514,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter/TransformPipelineBuilderAsync.cs","loc":161,"old-loc":173,"code-health":9.387218218812514,"old-code-health":9.387218218812514},{"file":"src/Paramore.Brighter/PipelineBuilder.cs","loc":397,"old-loc":352,"code-health":7.788037646779413,"old-code-health":7.788037646779413},{"file":"src/Paramore.Brighter/SubscriberRegistry.cs","loc":94,"old-loc":80,"code-health":9.387218218812514,"old-code-health":9.387218218812514},{"file":"src/Paramore.Brighter/MessageMapperRegistry.cs","loc":99,"old-loc":79,"code-health":9.387218218812514,"old-code-health":9.387218218812514},{"file":"src/Paramore.Brighter/Defer/Handlers/DeferMessageOnErrorHandler.cs","loc":34,"old-loc":34,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/Defer/Handlers/DeferMessageOnErrorHandlerAsync.cs","loc":36,"old-loc":36,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/DontAck/Handlers/DontAckOnErrorHandler.cs","loc":20,"old-loc":20,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/DontAck/Handlers/DontAckOnErrorHandlerAsync.cs","loc":22,"old-loc":22,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/Policies/Handlers/ResilienceExceptionPolicyHandler.cs","loc":44,"old-loc":44,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/Policies/Handlers/ResilienceExceptionPolicyHandlerAsync.cs","loc":51,"old-loc":51,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/Reject/Handlers/RejectMessageOnErrorHandler.cs","loc":29,"old-loc":29,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter/Reject/Handlers/RejectMessageOnErrorHandlerAsync.cs","loc":31,"old-loc":31,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionBrighterBuilder.cs","loc":179,"old-loc":179,"code-health":8.816158827775617,"old-code-health":9.096655465156704},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs","loc":596,"old-loc":587,"code-health":7.624891450768529,"old-code-health":7.624891450768529},{"file":"src/Paramore.Brighter.ServiceActivator.Extensions.Hosting/ServiceActivatorHostedService.cs","loc":68,"old-loc":30,"code-health":9.240656298427343,"old-code-health":10.0},{"file":"src/Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection/ServiceCollectionExtensions.cs","loc":155,"old-loc":131,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionSubscriberRegistry.cs","loc":72,"old-loc":66,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.Mediator/Steps.cs","loc":183,"old-loc":182,"code-health":10.0,"old-code-health":10.0}]},"removed-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-id":"4033","analysis-time":"2026-04-09T17:56:27Z","negative-impact-count":6,"suppressions":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"affected-hotspots":1,"commits":["d784d35bdabec4e355add39a6793915378c76335","0909036d874ea8cf17c7b2ca590fe00cd941fe43","0ca95529090d643854d4b86d9eaaa2f9db4dfd84","fd0dea0dd985d4b8c6345bc09bbbddd3c09a944f","c55c100773c2bdc66b6fe9588b921e3f5abb215a","340b2c7be533152e409052e23deffac4810d592b","3fd91f911971347e7aa802adc0baee04ad71737f","b1e85503191ad7c86b9bdd1ab4d81e136b9c9337","86a093c26cc7b51edce76d6d609e59418301f27f","83fbf8950c5ee46e5bda02106a7d4d0d00ad1ca4","5c63f11f1cf7864b62119fff0c1e37b2027e2bd0","7165c56d6275211f708298c37f3e24cc05615e26","61e14d8a0830b9fa7ebfa3693dca37759f9de116","8071258a33b945ded6ea813ea9d05029384fab4c","2bbff4cd5139f4d0bf9baaa192a706ea595cb068","aec61258616af9664792d8d5f55585efed30dc5a","62512a2e8984966af5c1e782e89137783209b8d0","3776ddb321d106c9d7c23e9d6d1bbb1f7e2f1161","637ed59e92531fb8bc86fe276455825b2e797381","44940789c8680142804a026599d0a6c01384a1e6","9e4a8f679036c77c2b8c05e2f291674be1aa71ca","3ffcb15936134bd18a6556988c9e8e7e1d28f121","e069165e49768a514663bfaab5fb80277c907638","e758e90ffccc3e093e3696cd7ad58ed59153b4e1","02bb62230120ddcabedf86e594b051c36ff8e257","f70a4a1704598d17afc8b1b508ff09c47cfca274","2d169587ee31614cae33fdc0154588cc471af57d","f9c75339efdbc721256c644df6da95a4ce32a265","b86ffa500bdaf3e416ea8a95b2606a94f97959f8","48c02ad1cf1f0c4513d348f379e0a32874b9e0da","0be9c2124b2ce749aa06073b14b55afee4136e05","82e171a829a9e70cb9812ec2f6b5e3adb979ddf4","ac306450e79e230e37f2c401a3c5d1920faad0f9","c1ad2fe22ad532bdeaefb8e1d813ac0f9f64d2f2","01115dd2d1b8aa217347b69d49362a43b6f062ee","066749b16668b47a8894862c8cd8f8b6994c0614","16a67db619c2f04462ee2605743137749ddcadf4","68b65f6696349860da633301f07b6f4869622e19","082e619339f37742467f6bc32a50b2a0e596615d","07c9c935ed0a0e8968e1ad45747926153c9ea352","7a9bb7eff01c0cdf217927b8078b6efe82a518fb","10372f8633c1b363d4ec340c4d0ec6c818c98b71","0bab1c7d00396eae221d94b99b3791864a4c731d","a043ab4dac4db264dff78f966087a12ce31c8a1c","a7b5637484275ac7d4550d878052a652e775e500","2f24c03b338398f00334dbab135daf676a2969ff","aba5a2a7541a950b48bce49bd4ed9115bdfbb40a","08cf2e2283edf9a60a8de7a3e269a9e92bedfd88","de1828fbf06308b270845daf738c54ec9f1e7c6a","c5e8fe454bd700f9acf7e3b61d6553c17a3ebf4c","557c446a9ad5b7a123b075aac54d9a5d7c7ff572","774fdc5cfcd4faad91ec6e0502777d0358ab3882","34c357bed74a6ebccf45ba9700b4603ff6e92de7","09234ca6d400fcfe9f8f86a6ba7ebedd75a2d768","3d2e4877b534f04179def7004e8e7adb3d4ba504","8852289a6240a3ac0304d87efbd4ff196fa23bff","db726acde8df949cee508126cf74f63e1bd9e4fa","34cdb9e2aa1333b0a546ba85bcfada111339758a","c97a6bd2d42d10d6a4cde646596c06f403605203","d6798a4fb49ab8c1169f36be4a0438d6f2110e81","21a3cd91bc3a967010c303f24e0cc474c83b110c","c5acdc4ed2551640699f2b64d66c91d028e6afda","d134bc10fc62d28c8efae16e40b0a61ab0960a7a","02f0db942e518ec5e566d6c79faa98d691d09915","d0d2aa26d883624255d8827625a6d67db77ee66b","20bfbf7098fa4cf164b513531fc5b74be9dc19ce","81f9ef039dca1b3a1d86f6e1e6a868c912f295d4","a6402c17ee6ca68c7bc359bdd8cdd34a18242d70","7964a486e6cb204c4f53f7b06938c37661f4ee49","0fe0a7f204571b3531cbc5a7574f8cf8b04bfa6c","c4a345d77602ef499ad3ff4f8dd6ba0208807039","aba9eba355e2f88a0a90e298f0e063d23f9b7dd2","f43afc9401438ac4eb7f821131f7d6bda8636ae4","dd467c96bd3e73f50b2a54517982ef7c6e51c75a","de186282650abdaf86dbcbb53039cee7d04cd901","a6e11fd0bee0112c0e4c52edc5e59e3b7641ddae","1b46761b6247dd07e5e7cfe4b5db66060f277d55","b2cde17ef2514f0ecfa171bdbf0d374179c96e8b","90a7526cd9ab2d1a6c4577dcf7600c897ade7143","31ae48cf4ba7dc8cade1bc942b88a1b8d693f915","2e4cc5a5a47c18f06c6cec423b30133544cb82e2","74e4d54a4760625e98b39eca69d7ccc2bf8b5f8a","4a0e4edbf58dcb0122ee6949635b7fc46c3ed2dc","9420a21f0ea902238bb500d14cd0aa6295cd8dac","3105926096ed647427f869458911571b562b2394","b80f9443511dee7d94bcbab8f726bfb886a824ea","76c5966561275f23795aafd13ca33c9c673dcb55","cbda0c3721f27041e7cdcbee357e7e6ada5279c9","c20b28b0466de9d07b50ac369b0f6296461b27a2","404bf598d0861b074c7461c734cf5854964f0ef5","5c97005fd47fb2f29a1fe3184d0eb85443f628c1","244b47231d7631ba6881c09886933ab55794543d","f9138f7c83e219bd0188cda18882c770c6edf319","3ddcedca718b79d5b5347257874eee3e2e2420c2","11393fdd0b657c8bcd3a945d1b04e139ae6877a7","ec4df46a34069fb223b78a26205fe1b6423aa124","9b1acbbf19897b0add3052cf90ffbddd3ea2acb2","bcbbb958b0c75f7be840f1797c07c21e416f2751","093ad195b9fa86e601ac3b014c45b08c737b78b6","dda65b7d8236b2fea33b60ccd033480cd751462a","6d5fdb131b851c37d8371072a3c649799a108de4","641e20b94f7a4d18541e36925cfe2b60b700a966"],"is-negative-review":true,"negative-findings":{"number-of-types":3,"number-of-files-touched":4,"findings":[{"method":"PumpHandlerMatch","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.ServiceActivator/Validation/ConsumerValidationRules.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":45,"what-changed":"PumpHandlerMatch has a cyclomatic complexity of 19, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"LogPublications","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter/Validation/PipelineDiagnosticWriter.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":103,"what-changed":"LogPublications has a cyclomatic complexity of 10, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"LogPublications","why-it-occurs":"Deep nested logic means that you have control structures like if-statements or loops inside other control structures. Deep nested logic increases the cognitive load on the programmer reading the code. The human working memory has a maximum capacity of 3-4 items; beyond that threshold, we struggle with keeping things in our head. Consequently, deep nested logic has a strong correlation to defects and accounts for roughly 20% of all programming mistakes.\n\nCodeScene measures the maximum nesting depth inside each function. The deeper the nesting, the lower the code health. The threshold for the C# language is 4 levels of nesting.","name":"Deep, Nested Complexity","file":"src/Paramore.Brighter/Validation/PipelineDiagnosticWriter.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":103,"what-changed":"LogPublications has a nested complexity depth of 4, threshold = 4","how-to-fix":"Occassionally, it's possible to get rid of the nested logic by [Replacing Conditionals with Guard Clauses](https://refactoring.com/catalog/replaceNestedConditionalWithGuardClauses.html).\n\nAnother viable strategy is to identify smaller building blocks inside the nested chunks of logic and extract those responsibilities into smaller, cohesive, and well-named functions. The [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html) refactoring explains the steps.","change-type":"introduced"},{"method":"RegisterHandlersFromAssembly","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionBrighterBuilder.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":239,"what-changed":"RegisterHandlersFromAssembly has a cyclomatic complexity of 9, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"method":"StartAsync","why-it-occurs":"A Bumpy Road is a function that contains multiple chunks of nested conditional logic inside the same function. The deeper the nesting and the more bumps, the lower the code health.\n\nA bumpy code road represents a lack of encapsulation which becomes an obstacle to comprehension. In imperative languages there’s also an increased risk for feature entanglement, which leads to complex state management. CodeScene considers the following rules for the code health impact: 1) The deeper the nested conditional logic of each bump, the higher the tax on our working memory. 2) The more bumps inside a function, the more expensive it is to refactor as each bump represents a missing abstraction. 3) The larger each bump – that is, the more lines of code it spans – the harder it is to build up a mental model of the function. The nesting depth for what is considered a bump is  levels of conditionals.","name":"Bumpy Road Ahead","file":"src/Paramore.Brighter.ServiceActivator.Extensions.Hosting/ServiceActivatorHostedService.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":41,"what-changed":"StartAsync has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is 2 blocks per function","how-to-fix":"Bumpy Road implementations indicate a lack of encapsulation. Check out the detailed description of the [Bumpy Road code health issue](https://codescene.com/blog/bumpy-road-code-complexity-in-context/).\n\nA Bumpy Road often suggests that the function/method does too many things. The first refactoring step is to identify the different possible responsibilities of the function. Consider extracting those responsibilities into smaller, cohesive, and well-named functions. The [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html) refactoring is the primary response.","change-type":"introduced"},{"method":"StartAsync","why-it-occurs":"Deep nested logic means that you have control structures like if-statements or loops inside other control structures. Deep nested logic increases the cognitive load on the programmer reading the code. The human working memory has a maximum capacity of 3-4 items; beyond that threshold, we struggle with keeping things in our head. Consequently, deep nested logic has a strong correlation to defects and accounts for roughly 20% of all programming mistakes.\n\nCodeScene measures the maximum nesting depth inside each function. The deeper the nesting, the lower the code health. The threshold for the C# language is 4 levels of nesting.","name":"Deep, Nested Complexity","file":"src/Paramore.Brighter.ServiceActivator.Extensions.Hosting/ServiceActivatorHostedService.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":41,"what-changed":"StartAsync has a nested complexity depth of 4, threshold = 4","how-to-fix":"Occassionally, it's possible to get rid of the nested logic by [Replacing Conditionals with Guard Clauses](https://refactoring.com/catalog/replaceNestedConditionalWithGuardClauses.html).\n\nAnother viable strategy is to identify smaller building blocks inside the nested chunks of logic and extract those responsibilities into smaller, cohesive, and well-named functions. The [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html) refactoring explains the steps.","change-type":"introduced"}]},"positive-impact-count":1,"repo":"Brighter","code-health":9.100159719921903,"version":"3.0","authors":["Ian Cooper"],"directives":{"added":[],"removed":[]},"positive-findings":{"number-of-types":1,"number-of-files-touched":1,"findings":[{"name":"Code Duplication","file":"src/Paramore.Brighter/TransformPipelineBuilder.cs","change-type":"improved","change-level":"improvement","is-hotspot?":false,"why-it-occurs":"Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.","how-to-fix":"A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. [Read More](https://codescene.com/blog/software-revolution-part3/)\n\nOnce you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.","what-changed":"reduced similar code in: FindMapToMessage,FindMapToRequest"}]},"notices":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"external-review-provider":"GitHub"},"analysistime":"2026-04-09T17:56:27.000Z","project-name":"Brighter","repository":"https://github.com/BrighterCommand/Brighter.git"}}