{"results":{"result":{"added-files":{"code-health":9.647970376797739,"old-code-health":0.0,"files":[{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarChannelFactory.cs","loc":26,"code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessageConsumerFactory.cs","loc":38,"code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessagingGatewayConnection.cs","loc":27,"code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarProducerFactory.cs","loc":38,"code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarPublication.cs","loc":27,"code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarSubscription.cs","loc":63,"code-health":9.6882083290695},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessageProducer.cs","loc":85,"code-health":9.6882083290695},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessageConsumer.cs","loc":305,"code-health":9.387218218812514},{"file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarBackgroundMessageConsumer.cs","loc":53,"code-health":10.0}]},"external-review-url":"https://github.com/BrighterCommand/Brighter/pull/3682","old-code-health":8.920523669487118,"modified-files":{"code-health":8.920523669487118,"old-code-health":8.920523669487118,"files":[{"file":"src/Paramore.Brighter/TraceContext.cs","loc":34,"old-loc":34,"code-health":10.0,"old-code-health":10.0},{"file":"src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessageConsumer.cs","loc":372,"old-loc":372,"code-health":8.720298221085535,"old-code-health":8.720298221085535},{"file":"src/Paramore.Brighter.MessagingGateway.RMQ.Sync/RmqMessageConsumerFactory.cs","loc":35,"old-loc":35,"code-health":10.0,"old-code-health":10.0}]},"removed-files":{"code-health":0.0,"old-code-health":0.0,"files":[]},"external-review-id":"3682","analysis-time":"2025-12-03T09:54:44Z","negative-impact-count":4,"suppressions":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"affected-hotspots":0,"commits":["159ee9c8d71b56ded723c5a7422cd410f7615329","91083e5241e5eafbc440f194f187eea7aab090cf","892f467baf00b1bbeaf98f35fce562dde6b0fc1f","65c3d938622f182c8bbe85f264929fb104bfc9b5","2581d2361e8181d4434544df1c730cca20677966","48e1889881f72dd12d62d6902be08dd1e1ca3290","215f017961bb7fe19009109c14ebcc16dabadf39","211bd260b572f1351a996766671b52fb1ba0f10f","5ebf049307174c4f4b24f4619cd934048aedb54c","641f7838ca736fcc7f670db1dadb54d316f5a12e","ccd37c7504f3490519877ee346d1408382126ec0","309f4c6c3a567b69679ed1696720d473dafcc2af","cb86df35d1d85249405c36febb07ce522914a8da","1800a5fe9856065026d6d9b7e417875a6738e1db","9a3d34d69b08ae1d3a688e1cdff6f5413280cc87","f656cb9e37909801ddc3350e5d368a9bb2eabd7a","fb172898c8b37bde559f65a994d76b5682ad1048","9f881774c790f4ce813e71f0434ca932f1c98cd3","6cecb710a278299abaf01484d31c337a60bba36f","233f841777b5988afa0e6e21d587ebadcc0b4860","68fba51c22127259c762312a0459a1811fd1e658","5ed1cc13371e41a9a53cbc0ef48ab354bec5e312","7cf9466ac7117d7a171d07b31ba4082afbd641c8","afe0fa39edf4dcc4cbf7769e6eb8fbe76ecf2d6d","bbf95bf066c36fe30a42b9b6e24df404c80c8cf5","afe30ba07674e91967ea513cbe8ff7bd8d463827","777734e72d435330b9244f760aef5cce422f5b2b","00ecbbc9ee78b41a95c986080aff8d309340ff44","f302defc41efdad5b29ffde31e870cdb3d2049a6","5f440034cb1e689e7718b07a257b037175a4001d","34efccd3e0fc227036080c645b176ec80122434a","443f065544b35e19691ff80a66b720698311f606"],"is-negative-review":true,"negative-findings":{"number-of-types":3,"number-of-files-touched":3,"findings":[{"method":"PulsarSubscription","why-it-occurs":"Counts the maximum number of arguments to each class' constructor. Too many arguments give a lower code health score. The rationale is that many constructor arguments indicate either a unit with low cohesion or an injection of dependencies on the wrong abstraction level.","name":"Constructor Over-Injection","file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarSubscription.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"9","loc-deleted":"8","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* fix: add support for Send to our synchronization context, to allow it to work in more use cases\r\n\r\n* fix: remove blocking wait from consumers that do not have a built in delay function.\r\n\r\n* fix: document where wait completes on the synchronizationcontext via the performer thread.\r\n\r\n* fix: update comments for sync and async, fix one where appropriate\r\n\r\n* feat: add adr describing approach; rename to use reactor and proactor to align with documentation.\r\n\r\n* feat: improve the adr about Brighter's usage of threading.\r\n\r\n* fix: use GetAwaiter().GetResult() for better call stack\r\n\r\n* feat: Update the ADR for IAmAMessageConsumerAsync\r\n\r\n* feat: add IAmAMessageConsumerAsync.cs and initial tests.\r\n\r\n* feat: additional tests for InMemoryConsumerAsync\r\n\r\n* feat: allow proactor to use async methods on transports, if they exist\r\n\r\n* feat: expose async methods from AWS to Proactor.cs\r\n\r\n* fix: nullability of AWS transport\r\n\r\n* fix: add async options to ASB\r\n\r\n* chore: branch switch\r\n\r\n* feat: add async support to Kafka\r\n\r\n* chore: separate summary from remarks\r\n\r\n* feat: add async to RMQ; update to RMQ V7\r\n\r\n* feat: add nullability to RMQ transport\r\n\r\n* feat: add async operations to mssql transport\r\n\r\n* chore: fix nullable issues for MS SQL transport\r\n\r\n* chore: move Redis to using ChannelName and RoutingKey over string, to reduce primitive obsession.\r\n\r\n* feat: upgrade the sychronizationcontext to work with the scheduler, derived from Stephen Cleary work\r\n\r\n* chore: reorg tests around reactor and proactor\r\n\r\n* Shift to run whole loop through synchronizationcontext over parts\r\n\r\n* Make the choice of proactor/reactor easier to understand\r\n\r\n* fix: switch to close implementation of Stephen Cleary's Nito as a starting point; can't use direct as not strong named, and adds depenency we can't fix\r\n\r\n* fix: update ADR to reflect sources changes; update sources to reflect code origins\r\n\r\n* fix: need to be explicit about your pump type if not proactor now\r\n\r\n* fix: license file URI for Stephen Cleary was wrong\r\n\r\n* feat: update ADR to show native support for proactor or reactor\r\n\r\n* fix: match proactor tests to reactor tests to flush issues\r\n\r\n* fix: add duplicates for the tests within Reactor, not in Proactor\r\n\r\n* fix: make rmq broker creation async; add cancellationtoken to sendasync\r\n\r\n* fix: add async tests for RMQ, use to flush out issues with synchronization context, particularly in test scenarios (as runner and framework also have contexts)\r\n\r\n* feat: add tests for internal syncrhonizationcontext, derived from Stephen Cleary\r\n\r\n* fix: tests show that we should remove the reentrancy check; there may be an issue, but this was not how to solve\r\n\r\n* fix: add async disposable pattern to consumer\r\n\r\n* feat: support IAsyncDisposable\r\n\r\n* fix: remove GetAwaiter().GetResult() from hot paths\r\n\r\n* fix: Improve the ADR to reflect results; attribution in the README.md file\r\n\r\n* chore: add some debugging support for thornier issues\r\n\r\n* chore: add better debug statements; helps to diagnose scheduler & context issues. Issue with scheduler being used outside of helper context.\r\n\r\n* chore: add async versions of AWS tests\r\n\r\n* chore: fix missing interface member\r\n\r\n* chore: add missing interface methods\r\n\r\n* chore: async service bus tests\r\n\r\n* fix: confirm that if task scheduler reset correctly, we don't get a spurious callback\r\n\r\n* fix: note concerns on TaskScheduler and ConfigureAwait\r\n\r\n* fix: another pass at exploring the edge case, that causes work to be accidentally queued to our scheduler\r\n\r\n* fix: some fallback approaches in Post\r\n\r\n* Pull one issue with blocking in the Proactor pipeline out.\r\n\r\n* fix: Notes on TaskScheduler and ConfigureAwait\r\n\r\n* fix: update ADR link\r\n\r\n* fix: allow asynchronous producer creation (mainly useful for AWS now)\r\n\r\n* fix: add async tests to Kafka; remove spurious finalize call\r\n\r\n* fix: add async to mqtt\r\n\r\n* fix: add async for mssql\r\n\r\n* fix: Add async Redis tests\r\n\r\n* chore: syntax modernization\r\n\r\n* fix: accidental sample drop","commit-date":"2024-12-29T17:51:50Z","current-rev":"9af3c0b15","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs","previous-rev":"8438dd9ee","commit-title":"Fix: Sync Over Async Improvements (#3409)","language":"C#","id":"67d626571410b5b5d0fa09b4ccda9573e7d98690","model-score":0.66,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\nindex 1b17df802..65e91b1c5 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\n@@ -27,11 +27,12 @@ public ReceivedResult(bool isDataValid, string jsonContent, string topic, string\n     /// <typeparam name=\"T\">The type of the message</typeparam>\n-    public class ReceivedResult<T> : ReceivedResult\n+    public class ReceivedResult<T>(\n+        bool isDataValid,\n+        string jsonContent,\n+        string topic,\n+        string messageType,\n+        long id,\n+        T? message)\n+        : ReceivedResult(isDataValid, jsonContent, topic, messageType, id)\n     {\n-        public ReceivedResult(bool isDataValid, string jsonContent, string topic, string messageType, long id, T message)\n-            : base(isDataValid, jsonContent, topic, messageType, id)\n-        {\n-            Message = message;\n-        }\n-\n-        public T Message { get; }\n+        public T? Message { get; } = message;\n \n","improvement-type":"Constructor Over-Injection"}],"change-level":"warning","is-hotspot?":false,"line":43,"what-changed":"PulsarSubscription has 23 arguments, max arguments = 5","how-to-fix":"There are multiple ways of addressing constructor over-injection. Sometimes you can introduce [FACADE](https://en.wikipedia.org/wiki/Facade_pattern) services that encapsulate lower-level dependencies.\n\nIn many cases, Constructor Over-Injection is a symptom of a deeper problem. Make sure to investigate the root cause, and get some inspiration and examples from [Mark Seemann's article](https://blog.ploeh.dk/2018/08/27/on-constructor-over-injection/) on the issue.","change-type":"introduced"},{"method":"PulsarSubscription","why-it-occurs":"Counts the maximum number of arguments to each class' constructor. Too many arguments give a lower code health score. The rationale is that many constructor arguments indicate either a unit with low cohesion or an injection of dependencies on the wrong abstraction level.","name":"Constructor Over-Injection","file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarSubscription.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"9","loc-deleted":"8","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* fix: add support for Send to our synchronization context, to allow it to work in more use cases\r\n\r\n* fix: remove blocking wait from consumers that do not have a built in delay function.\r\n\r\n* fix: document where wait completes on the synchronizationcontext via the performer thread.\r\n\r\n* fix: update comments for sync and async, fix one where appropriate\r\n\r\n* feat: add adr describing approach; rename to use reactor and proactor to align with documentation.\r\n\r\n* feat: improve the adr about Brighter's usage of threading.\r\n\r\n* fix: use GetAwaiter().GetResult() for better call stack\r\n\r\n* feat: Update the ADR for IAmAMessageConsumerAsync\r\n\r\n* feat: add IAmAMessageConsumerAsync.cs and initial tests.\r\n\r\n* feat: additional tests for InMemoryConsumerAsync\r\n\r\n* feat: allow proactor to use async methods on transports, if they exist\r\n\r\n* feat: expose async methods from AWS to Proactor.cs\r\n\r\n* fix: nullability of AWS transport\r\n\r\n* fix: add async options to ASB\r\n\r\n* chore: branch switch\r\n\r\n* feat: add async support to Kafka\r\n\r\n* chore: separate summary from remarks\r\n\r\n* feat: add async to RMQ; update to RMQ V7\r\n\r\n* feat: add nullability to RMQ transport\r\n\r\n* feat: add async operations to mssql transport\r\n\r\n* chore: fix nullable issues for MS SQL transport\r\n\r\n* chore: move Redis to using ChannelName and RoutingKey over string, to reduce primitive obsession.\r\n\r\n* feat: upgrade the sychronizationcontext to work with the scheduler, derived from Stephen Cleary work\r\n\r\n* chore: reorg tests around reactor and proactor\r\n\r\n* Shift to run whole loop through synchronizationcontext over parts\r\n\r\n* Make the choice of proactor/reactor easier to understand\r\n\r\n* fix: switch to close implementation of Stephen Cleary's Nito as a starting point; can't use direct as not strong named, and adds depenency we can't fix\r\n\r\n* fix: update ADR to reflect sources changes; update sources to reflect code origins\r\n\r\n* fix: need to be explicit about your pump type if not proactor now\r\n\r\n* fix: license file URI for Stephen Cleary was wrong\r\n\r\n* feat: update ADR to show native support for proactor or reactor\r\n\r\n* fix: match proactor tests to reactor tests to flush issues\r\n\r\n* fix: add duplicates for the tests within Reactor, not in Proactor\r\n\r\n* fix: make rmq broker creation async; add cancellationtoken to sendasync\r\n\r\n* fix: add async tests for RMQ, use to flush out issues with synchronization context, particularly in test scenarios (as runner and framework also have contexts)\r\n\r\n* feat: add tests for internal syncrhonizationcontext, derived from Stephen Cleary\r\n\r\n* fix: tests show that we should remove the reentrancy check; there may be an issue, but this was not how to solve\r\n\r\n* fix: add async disposable pattern to consumer\r\n\r\n* feat: support IAsyncDisposable\r\n\r\n* fix: remove GetAwaiter().GetResult() from hot paths\r\n\r\n* fix: Improve the ADR to reflect results; attribution in the README.md file\r\n\r\n* chore: add some debugging support for thornier issues\r\n\r\n* chore: add better debug statements; helps to diagnose scheduler & context issues. Issue with scheduler being used outside of helper context.\r\n\r\n* chore: add async versions of AWS tests\r\n\r\n* chore: fix missing interface member\r\n\r\n* chore: add missing interface methods\r\n\r\n* chore: async service bus tests\r\n\r\n* fix: confirm that if task scheduler reset correctly, we don't get a spurious callback\r\n\r\n* fix: note concerns on TaskScheduler and ConfigureAwait\r\n\r\n* fix: another pass at exploring the edge case, that causes work to be accidentally queued to our scheduler\r\n\r\n* fix: some fallback approaches in Post\r\n\r\n* Pull one issue with blocking in the Proactor pipeline out.\r\n\r\n* fix: Notes on TaskScheduler and ConfigureAwait\r\n\r\n* fix: update ADR link\r\n\r\n* fix: allow asynchronous producer creation (mainly useful for AWS now)\r\n\r\n* fix: add async tests to Kafka; remove spurious finalize call\r\n\r\n* fix: add async to mqtt\r\n\r\n* fix: add async for mssql\r\n\r\n* fix: Add async Redis tests\r\n\r\n* chore: syntax modernization\r\n\r\n* fix: accidental sample drop","commit-date":"2024-12-29T17:51:50Z","current-rev":"9af3c0b15","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs","previous-rev":"8438dd9ee","commit-title":"Fix: Sync Over Async Improvements (#3409)","language":"C#","id":"67d626571410b5b5d0fa09b4ccda9573e7d98690","model-score":0.66,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\nindex 1b17df802..65e91b1c5 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/ReceivedResult.cs\n@@ -27,11 +27,12 @@ public ReceivedResult(bool isDataValid, string jsonContent, string topic, string\n     /// <typeparam name=\"T\">The type of the message</typeparam>\n-    public class ReceivedResult<T> : ReceivedResult\n+    public class ReceivedResult<T>(\n+        bool isDataValid,\n+        string jsonContent,\n+        string topic,\n+        string messageType,\n+        long id,\n+        T? message)\n+        : ReceivedResult(isDataValid, jsonContent, topic, messageType, id)\n     {\n-        public ReceivedResult(bool isDataValid, string jsonContent, string topic, string messageType, long id, T message)\n-            : base(isDataValid, jsonContent, topic, messageType, id)\n-        {\n-            Message = message;\n-        }\n-\n-        public T Message { get; }\n+        public T? Message { get; } = message;\n \n","improvement-type":"Constructor Over-Injection"}],"change-level":"warning","is-hotspot?":false,"line":163,"what-changed":"PulsarSubscription has 22 arguments, max arguments = 5","how-to-fix":"There are multiple ways of addressing constructor over-injection. Sometimes you can introduce [FACADE](https://en.wikipedia.org/wiki/Facade_pattern) services that encapsulate lower-level dependencies.\n\nIn many cases, Constructor Over-Injection is a symptom of a deeper problem. Make sure to investigate the root cause, and get some inspiration and examples from [Mark Seemann's article](https://blog.ploeh.dk/2018/08/27/on-constructor-over-injection/) on the issue.","change-type":"introduced"},{"method":"CreateMessageMetadata","why-it-occurs":"A Complex Method has a high cyclomatic complexity. The recommended threshold for the C# language is a cyclomatic complexity lower than 9.","name":"Complex Method","file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessageProducer.cs","refactoring-examples":[{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"1","loc-deleted":"2","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"NJsonSchema represents class inheritance via an embedded\n{ definitions: { Base: {...} }, allOf: [{$ref: \"#/definitions/Base\"}, ...] }\nblock on every derived message schema. The previous output duplicated the base\ntype once per message and rewrote refs to resolve inside each message's payload\nsubtree.\n\nThis commit lifts all definitions/$defs entries to a single shared pool under\ncomponents.schemas, dedupes by name (first-seen content wins), and rewrites refs\nacross both the message bodies and the hoisted definitions themselves so\ncross-definition refs continue to resolve. A class hierarchy like\nPaymentReceivedEvent and OrderCreatedEvent both inheriting Event now produces a\nsingle Event entry under components.schemas, with each message's allOf pointing\nat #/components/schemas/Event.\n\nSide benefits:\n- Smaller output (~18% on the sample documents).\n- Tooling consuming the spec sees the base type once and can generate a shared\n  parent class on the client side.\n\nDrops the last open CodeScene Complex Method finding by collapsing the\nJsonValueKind.True / False arms in JsonElementToObject into a single\nGetBoolean() arm.\n\nTests rewritten to assert the new ref shape (#/components/schemas/X) and the\npresence of the hoisted entries under Components.Schemas.\n\nCo-Authored-By: Claude <noreply@anthropic.com>","commit-date":"2026-05-16T19:26:51Z","current-rev":"f5003d644","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs","previous-rev":"f38857a51","commit-title":"feat(#3828): hoist shared JSON Schema definitions to components.schemas","language":"C#","id":"a281e7924ca426d627f9bb709daa231724381400","model-score":0.92,"author-id":null,"project-id":32198,"delta-file-score":0.31179166,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\nindex 5bc24c0a6..6022aa42b 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiHostExtensions.cs\n@@ -191,4 +191,3 @@ private static string SerializeAsYaml(object? tree)\n             JsonValueKind.Number => ReadNumber(element),\n-            JsonValueKind.True => true,\n-            JsonValueKind.False => false,\n+            JsonValueKind.True or JsonValueKind.False => element.GetBoolean(),\n             _ => null,\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"DevJonny","training-data":{"loc-added":"17","loc-deleted":"14","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.08","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"jonny.ollifflee@gmail.com","commit-full-message":"Extract LoadAssemblyTypes (handles ReflectionTypeLoadException) and\nTryGetPublicationTopic (per-type filter + topic resolution) from\nGetPublicationTopicTypes. Targets the last CodeScene advisory finding\n(Complex Method).\n\n46/46 AsyncAPI tests pass on net9.0 and net10.0.\n\nCo-Authored-By: Claude (claude-opus-4-7) <noreply@anthropic.com>","commit-date":"2026-05-17T06:26:45Z","current-rev":"7f8b45280","filename":"Brighter/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs","previous-rev":"6be065b71","commit-title":"refactor(#3828): split GetPublicationTopicTypes for clarity","language":"C#","id":"2287bc725ce16bc068f9b3178aae023ee15b02f8","model-score":0.5,"author-id":null,"project-id":32198,"delta-file-score":0.33626333,"diff":"diff --git a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\nindex 4a13108c1..6f8bd66a9 100644\n--- a/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n+++ b/src/Paramore.Brighter.AsyncAPI/AsyncApiDocumentGenerator.cs\n@@ -248,6 +248,14 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n         {\n-            Type[] types;\n+            foreach (var type in LoadAssemblyTypes(assembly))\n+            {\n+                var topic = TryGetPublicationTopic(type);\n+                if (topic != null) yield return (type, topic);\n+            }\n+        }\n+\n+        private static IEnumerable<Type> LoadAssemblyTypes(Assembly assembly)\n+        {\n             try\n             {\n-                types = assembly.GetTypes();\n+                return assembly.GetTypes();\n             }\n@@ -255,18 +263,13 @@ private IEnumerable<Assembly> GetAssembliesToScan()\n             {\n-                types = ex.Types.Where(t => t != null).ToArray()!;\n+                return ex.Types.Where(t => t != null).ToArray()!;\n             }\n+        }\n \n-            foreach (var type in types)\n-            {\n-                if (type.IsAbstract || type.IsInterface) continue;\n-                if (!typeof(IRequest).IsAssignableFrom(type)) continue;\n-\n-                var attr = type.GetCustomAttribute<PublicationTopicAttribute>();\n-                if (attr == null) continue;\n-\n-                var topic = attr.Destination?.RoutingKey?.Value;\n-                if (string.IsNullOrEmpty(topic)) continue;\n+        private static string? TryGetPublicationTopic(Type type)\n+        {\n+            if (type.IsAbstract || type.IsInterface) return null;\n+            if (!typeof(IRequest).IsAssignableFrom(type)) return null;\n \n-                yield return (type, topic);\n-            }\n+            var topic = type.GetCustomAttribute<PublicationTopicAttribute>()?.Destination?.RoutingKey?.Value;\n+            return string.IsNullOrEmpty(topic) ? null : topic;\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Ian Cooper","training-data":{"loc-added":"39","loc-deleted":"20","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"ian_hammond_cooper@yahoo.co.uk","commit-full-message":"* chore: fix casing on ADR\n\n* fix: update the ADR for a better understanding of how we propogate context\n\n* chore: update the name of InMemoryMessageProducer.cs to better fit the pattern used elsewhere\n\n* chore: swap Assert.True to Assert.Contains for collections\n\n* feat: failing test checks that we have added trace context to headers of outgoing message\n\n* feat: add a class to support trace state explicity\n\n* feat: modify tests\n\n* chore: filename issue\n\n* chore: switch to assert contains\n\n* feat: ensure that we propogate trace context\n\n* fix: allow trace context to serialize for tests\n\n* fixing broken tests\n\n* feat: add async tests\n\n* feat: add sync version of propogation tests\n\n* chore: whitespace\n\n* feat: add RMQ support for traceparent and tracestate.\n\n* fix: baggage is not tracecontext, although format is similar; baggage is user-defined\n\n* fix: tests were using tracestate not baggage\n\n* fix: crate link spans as MS committed change; propogate tracecontext and baggage through pipeline\n\n* fix: adjust test to ensure baggage on parent activity\n\n* fix: rename test class\n\n* chore: add tracestring and baggage to test\n\n* fix RMQ propogates the context\n\n* chore: port between RMQ instances\n\n* fix: add trace propogation to RMQ\n\n* fix: add kafka context propogation\n\n* chore: Add some primitive types to avoid primitive obsession warnings from codescene\n\n* chore: missing XML comments on public methods\n\n* chore: lower primitive obsession for message and request id\n\n* chore: usages of content type for new header\n\n* fix: adjust for reply to as a routing key\n\n* fix: add serialization for new value objects\n\n* chore: adjust docs\n\n* fix: issues with serialization of new primitive types\n\n* fix: ContentType.cs value may be null in constructor\n\n* fix: increase test delay as fragile\n\n* Update src/Paramore.Brighter/Id.cs\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>\n\n* fix: need to convert header types to primitives for bag\n\n* fix: make sync RMQ tests serial for reliability\n\n* fix: refactor LLM code\n\n* chore: move converters to own directory, add them for NewtonSoft for Kafka serdes\n\n* chore: remove spurious directory\n\n* fix: use a string base type for Id in a request, to make serializing it easily.\n\n* chore: unneeded namespace on attribute\n\n* feat: adding cloud events to Redis\n\n* fix: tests failing due to bad test string, does not use cloud events names\n\n* feat: add additional tests of persistence of message headers for new properties\n\n* fix: ASB tests need to check all properties\n\n* feat: use MS ContentType as used on public SDK and user defined type is awkward in that context (although MS type has limitations)\n\n* fix: body equality test not working\n\n* fix: errors with tests around cloud events transform; need to use strings, similar to params\n\n* fix: use charset with MS contenttype\n\n* feat: adding cloudevents support to ASB\n\n* feat: add cloud events and tracing to ASB\n\n* fix: correct tests for contenttype changes and cloudevents json specification\n\n* fix: correct for new content type\n\n---------\n\nCo-authored-by: Rafael Lillo <rafael.andrade@justeattakeaway.com>","commit-date":"2025-06-14T15:20:36Z","current-rev":"bc93b511c","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs","previous-rev":"5767656ad","commit-title":"OTel Transports (#3605)","language":"C#","id":"4136886c0cbe98b6d426b5645ff6ddf0f5b82421","model-score":0.49,"author-id":null,"project-id":32198,"delta-file-score":0.4636132,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\nindex 65e3b1f20..18a8d7a88 100644\n--- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs\n@@ -4,2 +4,3 @@\n using System.Net;\n+using System.Net.Mime;\n using System.Text.Json;\n@@ -10,3 +11,5 @@\n using Microsoft.Extensions.Logging;\n+using Newtonsoft.Json;\n using Paramore.Brighter.Extensions;\n+using Paramore.Brighter.JsonConverters;\n using Paramore.Brighter.Logging;\n@@ -48,2 +51,16 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n     public async Task<string?> SendAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken)\n+    {\n+        var request = CreateSendMessageRequest(message, delay);\n+\n+        var response = await _client.SendMessageAsync(request, cancellationToken);\n+        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n+            or HttpStatusCode.Accepted)\n+        {\n+            return response.MessageId;\n+        }\n+\n+        return null;\n+    }\n+\n+    private SendMessageRequest CreateSendMessageRequest(Message message, TimeSpan? delay)\n     {\n@@ -51,3 +68,3 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            QueueUrl = _queueUrl, \n+            QueueUrl = _queueUrl,\n             MessageBody = message.Body.Value\n@@ -55,2 +72,11 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n+        SetMessageDelay(request, delay);\n+        SetFifoQueueProperties(request, message);\n+        SetMessageAttributes(request, message);\n+\n+        return request;\n+    }\n+\n+    private void SetMessageDelay(SendMessageRequest request, TimeSpan? delay)\n+    {\n         delay ??= TimeSpan.Zero;\n@@ -58,3 +84,2 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         {\n-            // SQS has a hard limit of 15min for Delay in Seconds\n             if (delay.Value > s_maxDelay)\n@@ -67,27 +92,30 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n         }\n+    }\n \n-        if (_queueType == SqsType.Fifo)\n+    private void SetFifoQueueProperties(SendMessageRequest request, Message message)\n+    {\n+        if (_queueType != SqsType.Fifo) return;\n+        request.MessageGroupId = message.Header.PartitionKey;\n+        if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n         {\n-            request.MessageGroupId = message.Header.PartitionKey;\n-            if (message.Header.Bag.TryGetValue(HeaderNames.DeduplicationId, out var deduplicationId))\n-            {\n-                request.MessageDeduplicationId = (string)deduplicationId;\n-            }\n+            request.MessageDeduplicationId = (string)deduplicationId;\n         }\n-        \n-        // Combine cloud event headers into a single JSON object\n+    }\n+\n+    private void SetMessageAttributes(SendMessageRequest request, Message message)\n+    {\n         string cloudEventHeadersJson = CreateCloudEventHeadersJson(message);\n \n-        // we can set up to 10 attributes;  we use a single JSON object as the cloud event headers; we can set nine others directly \n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var messageAttributes = new Dictionary<string, MessageAttributeValue>\n-        { \n-            [HeaderNames.Id] = new (){ StringValue = message.Header.MessageId, DataType = \"String\" },\n+        {\n+            [HeaderNames.Id] = new() { StringValue = message.Header.MessageId, DataType = \"String\" },\n             [HeaderNames.CloudEventHeaders] = new() { StringValue = cloudEventHeadersJson, DataType = \"String\" },\n-            [HeaderNames.Topic] = new() { StringValue = _queueUrl ,DataType = \"String\" },\n+            [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = \"String\" },\n             [HeaderNames.MessageType] = new() { StringValue = message.Header.MessageType.ToString(), DataType = \"String\" },\n-            [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = \"String\" },\n+            [HeaderNames.ContentType] = new() { StringValue = contentType.ToString(), DataType = \"String\" },\n             [HeaderNames.Timestamp] = new() { StringValue = Convert.ToString(message.Header.TimeStamp.ToRfc3339()), DataType = \"String\" }\n         };\n-        \n-        if (!string.IsNullOrEmpty(message.Header.ReplyTo))\n+\n+        if (!RoutingKey.IsNullOrEmpty(message.Header.ReplyTo))\n             messageAttributes.Add(HeaderNames.ReplyTo, new MessageAttributeValue { StringValue = message.Header.ReplyTo, DataType = \"String\" });\n@@ -97,20 +125,10 @@ public SqsMessageSender(string queueUrl, SqsType queueType, AmazonSQSClient clie\n \n-        if (!string.IsNullOrEmpty(message.Header.CorrelationId))\n-            messageAttributes.Add(HeaderNames.CorrelationId,  new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n-        \n-        //we have to add some attributes into our bag, to prevent overloading the message attributes\n+        if (!Id.IsNullOrEmpty(message.Header.CorrelationId))\n+            messageAttributes.Add(HeaderNames.CorrelationId, new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = \"String\" });\n+\n         message.Header.Bag[HeaderNames.HandledCount] = message.Header.HandledCount.ToString(CultureInfo.InvariantCulture);\n-        \n-        var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n+\n+        var bagJson = System.Text.Json.JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);\n         messageAttributes[HeaderNames.Bag] = new() { StringValue = bagJson, DataType = \"String\" };\n         request.MessageAttributes = messageAttributes;\n-\n-        var response = await _client.SendMessageAsync(request, cancellationToken);\n-        if (response.HttpStatusCode is HttpStatusCode.OK or HttpStatusCode.Created\n-            or HttpStatusCode.Accepted)\n-        {\n-            return response.MessageId;\n-        }\n-\n-        return null;\n     }\n@@ -119,2 +137,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n     {\n+        var contentType = message.Header.ContentType ?? new ContentType(MediaTypeNames.Text.Plain);\n         var cloudEventHeaders = new Dictionary<string, string>\n@@ -122,3 +141,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n             [HeaderNames.Id] = Convert.ToString(message.Header.MessageId),\n-            [HeaderNames.DataContentType] = message.Header.ContentType ?? \"plain/text\",\n+            [HeaderNames.DataContentType] = contentType.ToString(),\n             [HeaderNames.DataSchema] = message.Header.DataSchema?.ToString() ?? string.Empty,\n@@ -139,3 +158,3 @@ private static string CreateCloudEventHeadersJson(Message message)\n \n-        var cloudEventHeadersJson = JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n+        var cloudEventHeadersJson = System.Text.Json.JsonSerializer.Serialize(cloudEventHeaders, JsonSerialisationOptions.Options);\n         return cloudEventHeadersJson;\n@@ -149,2 +168 @@ private static partial class Log\n }\n-\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Steve Bush","training-data":{"loc-added":"46","loc-deleted":"63","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.17","delta-n-functions":"0","current-file-score":"10.0"},"author-email":"stevebu@bushchang.com","commit-full-message":"* Refactoring of MQTT Support to use 4.3 MQTTnet\n\n* Removed MQTTnet v5 support for UserProperties.  Removed Comments.\n\n* Fix logging of Exception when Test context is not available\n\n* Increase wait to 1 sec before testing for number of messages\n\n* SpinUntil instead of trying to time when requests will be processed\n\n* Update target platforms for test helpers to net8.0;net9.0\n\n* Adjust timeout to see if all messages are processed.\n\n* Reworking logging to reduce logging for routine message delivery\n\n* Fixed static Log class merge errors\n\n* Fix up passing of exception and port to new Log API.","commit-date":"2025-04-09T08:57:56Z","current-rev":"099708800","filename":"Brighter/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs","previous-rev":"0741b9ef1","commit-title":"feature: refactoring of MQTT Support to use 4.3 MQTTnet (#3578)","language":"C#","id":"d27e8bc6b9d615c647eca3faefa2e0371bd11b02","model-score":0.3,"author-id":null,"project-id":32198,"delta-file-score":0.51462585,"diff":"diff --git a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\nindex 974658e30..dfeca6cff 100644\n--- a/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n+++ b/src/Paramore.Brighter.MessagingGateway.MQTT/MQTTMessageConsumer.cs\n@@ -1,5 +1,3 @@\n ﻿using System;\n-using System.Buffers;\n using System.Collections.Generic;\n-using System.Text;\n using System.Text.Json;\n@@ -9,2 +7,3 @@\n using MQTTnet;\n+using MQTTnet.Client;\n using MQTTnet.Packets;\n@@ -16,12 +15,12 @@ namespace Paramore.Brighter.MessagingGateway.MQTT\n     /// <summary>\n-    /// Class MQTTMessageConsumer.\n-    /// The <see cref=\"MQTTMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n+    /// Class MqttMessageConsumer.\n+    /// The <see cref=\"MqttMessageConsumer\"/> is used on the server to receive messages from the broker. It abstracts away the details of \n     /// inter-process communication tasks from the server. It handles subscription establishment, request reception and dispatching.\n     /// </summary>\n-    public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n+    public partial class MqttMessageConsumer : IAmAMessageConsumerSync, IAmAMessageConsumerAsync\n     {\n         private readonly string _topic;\n-        private readonly Queue<Message> _messageQueue = new Queue<Message>();\n-        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MQTTMessageConsumer>();\n-        private readonly Message _noopMessage = new Message();\n+        private readonly Queue<Message> _messageQueue = new();\n+        private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<MqttMessageConsumer>();\n+        private readonly Message _noopMessage = new();\n         private readonly IMqttClient _mqttClient;\n@@ -30,10 +29,22 @@ public partial class MQTTMessageConsumer : IAmAMessageConsumerSync, IAmAMessageC\n         /// <summary>\n-        /// Initializes a new instance of the <see cref=\"MQTTMessageConsumer\" /> class.\n-        /// Sync over Async within constructor\n+        /// Initializes a new instance of the <see cref=\"MqttMessageConsumer\"/> class.\n         /// </summary>\n-        /// <param name=\"configuration\"></param>\n-        public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configuration)\n+        /// <param name=\"configuration\">\n+        /// The configuration settings for the MQTT message consumer, including connection details, \n+        /// topic prefix, client credentials, and other options.\n+        /// </param>\n+        /// <exception cref=\"ArgumentNullException\">\n+        /// Thrown when the <paramref name=\"configuration.TopicPrefix\"/> is null.\n+        /// </exception>\n+        /// <remarks>\n+        /// This constructor sets up the MQTT client with the provided configuration, establishes \n+        /// the connection to the broker, and subscribes to the specified topic.\n+        ///\n+        /// 04/03/2025:\n+        ///     - Removed support for user properties as they are not supported in v3.1.1 of the MQTT protocol.\n+        /// </remarks>\n+        public MqttMessageConsumer(MqttMessagingGatewayConsumerConfiguration configuration)\n         {\n-            _topic =  $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n-            \n+            _topic = $\"{configuration.TopicPrefix}/#\" ?? throw new ArgumentNullException(nameof(configuration.TopicPrefix));\n+\n             MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder()\n@@ -52,7 +63,9 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n \n-            _mqttClientOptions = mqttClientOptionsBuilder.Build();\n+            _mqttClientOptions = mqttClientOptionsBuilder\n+                .WithTcpServer(configuration.Hostname, configuration.Port)\n+                .Build();\n \n-            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, recieve etc.\n+            //TODO: Switch to using the low level client here, as it allows us explicit control over ack, receive etc.\n             //This is slated for post V10, for now, we just want to upgrade this support the V10 release\n-            _mqttClient = new MqttClientFactory().CreateMqttClient();\n+            _mqttClient = new MqttFactory().CreateMqttClient();\n \n@@ -61,36 +74,4 @@ public MQTTMessageConsumer(MQTTMessagingGatewayConsumerConfiguration configurati\n                 Log.MqttMessageConsumerReceivedMessage(s_logger, configuration.TopicPrefix);\n-                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.Payload.ToArray(), JsonSerialisationOptions.Options);\n-                foreach (MqttUserProperty property in e.ApplicationMessage.UserProperties)\n-                {\n-                    if (property.Name == HeaderNames.Type)\n-                    {\n-                        message.Header.Type = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.SpecVersion)\n-                    {\n-                        message.Header.SpecVersion = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.Source)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var source))\n-                        {\n-                            message.Header.Source = source;\n-                        }\n-                    }\n-                    else if (property.Name == HeaderNames.Subject)\n-                    {\n-                        message.Header.Subject = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataContentType)\n-                    {\n-                        message.Header.ContentType = property.Value;\n-                    }\n-                    else if (property.Name == HeaderNames.DataSchema)\n-                    {\n-                        if (Uri.TryCreate(property.Value, UriKind.RelativeOrAbsolute, out var dataSchema))\n-                        {\n-                            message.Header.DataSchema = dataSchema;\n-                        }\n-                    }\n-                }\n+                var message = JsonSerializer.Deserialize<Message>(e.ApplicationMessage.PayloadSegment.ToArray(), JsonSerialisationOptions.Options);\n+\n                 _messageQueue.Enqueue(message);\n@@ -112,3 +93,3 @@ public void Acknowledge(Message message)\n         }\n-        \n+\n         /// <summary>\n@@ -117,3 +98,3 @@ public void Acknowledge(Message message)\n         /// <param name=\"message\"></param>\n-        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task AcknowledgeAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -127,8 +108,8 @@ public void Dispose()\n         }\n-        \n-        \n+\n+\n         public ValueTask DisposeAsync()\n         {\n-           _mqttClient.Dispose();\n-           return new ValueTask(Task.CompletedTask);\n+            _mqttClient.Dispose();\n+            return new ValueTask(Task.CompletedTask);\n         }\n@@ -142,3 +123,3 @@ public void Purge()\n         }\n-        \n+\n         /// <summary>\n@@ -147,6 +128,6 @@ public void Purge()\n         /// <param name=\"cancellationToken\">Allows cancellation of the purge task</param>\n-        public Task PurgeAsync(CancellationToken cancellationToken = default(CancellationToken))\n+        public Task PurgeAsync(CancellationToken cancellationToken = default)\n         {\n-           Purge();\n-           return Task.CompletedTask;\n+            Purge();\n+            return Task.CompletedTask;\n         }\n@@ -159,4 +140,6 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         {\n-            if (_messageQueue.Count==0)\n+            if (_messageQueue.Count == 0)\n+            {\n                 return new[] { _noopMessage };\n+            }\n \n@@ -175,5 +158,5 @@ public Message[] Receive(TimeSpan? timeOut = null)\n                     }\n-                    catch (TimeoutException)\n+                    catch (TimeoutException te)\n                     {\n-                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, _messageQueue.Count);\n+                        Log.MqttMessageConsumerTimedOutRetrievingMessages(s_logger, te, _messageQueue.Count);\n                     }\n@@ -184,4 +167,4 @@ public Message[] Receive(TimeSpan? timeOut = null)\n         }\n-        \n-        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default(CancellationToken))\n+\n+        public Task<Message[]> ReceiveAsync(TimeSpan? timeOut = null, CancellationToken cancellationToken = default)\n         {\n@@ -203,3 +186,3 @@ public void Reject(Message message)\n         /// <param name=\"cancellationToken\"></param>\n-        public Task RejectAsync(Message message, CancellationToken cancellationToken = default(CancellationToken))\n+        public Task RejectAsync(Message message, CancellationToken cancellationToken = default)\n         {\n@@ -218,5 +201,5 @@ public bool Requeue(Message message, TimeSpan? delay = null)\n         }\n-        \n+\n         public Task<bool> RequeueAsync(Message message, TimeSpan? delay = null,\n-            CancellationToken cancellationToken = default(CancellationToken))\n+            CancellationToken cancellationToken = default)\n         {\n@@ -239,5 +222,5 @@ private async Task Connect(int connectionAttempts)\n                 }\n-                catch (Exception)\n+                catch (Exception ex)\n                 {\n-                    Log.UnableToConnectMqttConsumerClient(s_logger);\n+                    Log.UnableToConnectMqttConsumerClient(s_logger, ex);\n                 }\n@@ -248,7 +231,7 @@ private static partial class Log\n         {\n-            [LoggerMessage(LogLevel.Information, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n+            [LoggerMessage(LogLevel.Trace, \"MQTTMessageConsumer: Received message from queue {TopicPrefix}\")]\n             public static partial void MqttMessageConsumerReceivedMessage(ILogger logger, object topicPrefix);\n \n-            [LoggerMessage(LogLevel.Warning, \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n-            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, int queueLength);\n+            [LoggerMessage(Level = LogLevel.Warning, Message = \"MQTTMessageConsumer: Timed out retrieving messages.  Queue length: {QueueLength}\")]\n+            public static partial void MqttMessageConsumerTimedOutRetrievingMessages(ILogger logger, Exception ex, int queueLength);\n \n@@ -259,5 +242,5 @@ private static partial class Log\n             public static partial void SubscribedToTopic(ILogger logger, string topic);\n-            \n-            [LoggerMessage(LogLevel.Error, \"Unable to connect MQTT Consumer Client\")]\n-            public static partial void UnableToConnectMqttConsumerClient(ILogger logger);\n+\n+            [LoggerMessage(Level = LogLevel.Error, Message = \"Unable to connect MQTT Consumer Client\")]\n+            public static partial void UnableToConnectMqttConsumerClient(ILogger logger, Exception ex);\n         }\n","improvement-type":"Complex Method"},{"architectural-component-id":null,"author-name":"Tom Longhurst","training-data":{"loc-added":"74","loc-deleted":"33","delta-cc-mean":"0.0","delta-cc-total":"0","delta-penalties":"1.0","delta-n-functions":"0","current-file-score":"9.387218218812514"},"author-email":"30480171+thomhurst@users.noreply.github.com","commit-full-message":"* fix: serialise Dispatcher.Start so consumers are Open before Receive() returns (#4075)\n\nDispatcher.Start() flipped State to DS_RUNNING before opening consumers, so\nReceive()'s busy-wait could return while some consumers were still Shut. A\nShut()/End() racing into that window no-op'd against not-yet-Open consumers\n(Consumer.Shut only acts when State == Open). The control task then opened\nthose consumers, leaving orphan performers parked in Task.Delay forever and\nhanging End() in Task.WaitAny.\n\nOpen every consumer and register its task before publishing DS_RUNNING, and\nreplace the 100ms poll with a TaskCompletionSource that gives callers an\nexplicit happens-before edge over the opens. Add a lock + pending-shut flag\non Consumer so a Shut() arriving before Open() is honoured (defence in depth\nfor any future caller path that opens off the control task).\n\n* chore: trim verbose comments on #4075 fix\n\n* refactor: flatten Dispatcher.Start nesting (CodeScene Bumpy Road)\n\nExtract control-loop body into RunControlLoop, then split into\nTryOpenConsumers, WaitForPerformersToStop, HandleNextStoppedPerformer,\nand RemoveConsumerForTask. Each helper has at most one level of\nconditional nesting, killing the \"Bumpy Road Ahead\" flag on\nPR #4081 without changing behavior.\n\n* refactor: simplify TryOpenConsumers and drop redundant OfType\n\n- Collapse TryOpenConsumers (bool, dead false-branch) into void\n  OpenConsumers; move TCS signalling and try/catch up into\n  RunControlLoop where they belong.\n- Iterate Consumers directly instead of OfType<Consumer>(); the\n  IAmAConsumer interface already exposes Open/Job/JobId.\n\n---------\n\nCo-authored-by: Ian Cooper <ian_hammond_cooper@yahoo.co.uk>","commit-date":"2026-04-26T17:18:29Z","current-rev":"f94246cd8","filename":"Brighter/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs","previous-rev":"3a0528638","commit-title":"fix: Dispatcher shutdown race leaves late-opened Reactor consumers running (#4075) (#4081)","language":"C#","id":"b453d2853c4228105b707fea96483f93cd8ee3c1","model-score":0.1,"author-id":null,"project-id":32198,"delta-file-score":0.84183866,"diff":"diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\nindex 8abdbe43d..a9ca4b575 100644\n--- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n+++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs\n@@ -284,63 +284,104 @@ private void Start()\n         {\n-            _controlTask = Task.Factory.StartNew(() =>\n+            // Block Start() callers until every consumer is Open. A Shut()/End() racing in\n+            // immediately after Receive() returns must not see a still-Shut consumer, or the\n+            // late-opened performer leaks and End() hangs forever in Task.WaitAny.\n+            var startup = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);\n+\n+            _controlTask = Task.Factory.StartNew(\n+                () => RunControlLoop(startup),\n+                CancellationToken.None,\n+                TaskCreationOptions.LongRunning,\n+                TaskScheduler.Default);\n+\n+            startup.Task.GetAwaiter().GetResult();\n+        }\n+\n+        private void RunControlLoop(TaskCompletionSource<bool> startup)\n+        {\n+            if (State != DispatcherState.DS_AWAITING && State != DispatcherState.DS_STOPPED)\n             {\n-                if (State == DispatcherState.DS_AWAITING || State == DispatcherState.DS_STOPPED)\n-                {\n-                    Log.DispatcherStarting(s_logger);\n-                    State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+                return;\n+            }\n+\n+            Log.DispatcherStarting(s_logger);\n+\n+            try\n+            {\n+                OpenConsumers();\n+                State = DispatcherState.DS_RUNNING;\n+                startup.TrySetResult(true);\n+            }\n+            catch (Exception ex)\n+            {\n+                Log.ErrorOnConsumer(s_logger, ex);\n+                startup.TrySetException(ex);\n+                throw;\n+            }\n+\n+            Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n \n-                    var consumers = Consumers.ToArray();\n-                    consumers.Each(consumer => consumer.Open());\n-                    consumers.Each(consumer => _tasks.TryAdd(consumer.JobId, consumer.Job!));\n+            WaitForPerformersToStop();\n+\n+            State = DispatcherState.DS_STOPPED;\n+            Log.DispatcherStopped(s_logger);\n+        }\n \n-                    Log.DispatcherStartingPerformers(s_logger, _tasks.Count);\n+        private void OpenConsumers()\n+        {\n+            foreach (var consumer in Consumers)\n+            {\n+                consumer.Open();\n+                if (consumer.Job is not null)\n+                    _tasks.TryAdd(consumer.JobId, consumer.Job);\n+            }\n+        }\n \n-                    while (_tasks.Any())\n+        private void WaitForPerformersToStop()\n+        {\n+            while (!_tasks.IsEmpty)\n+            {\n+                try\n+                {\n+                    HandleNextStoppedPerformer();\n+                }\n+                catch (AggregateException ae)\n+                {\n+                    ae.Handle(ex =>\n                     {\n-                        try\n-                        {\n-                            var runningTasks = _tasks.Values.ToArray();\n-                            var index = Task.WaitAny(runningTasks);\n-                            var stoppingConsumer = runningTasks[index];\n-                            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n-\n-                            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n-                            if (consumer != null)\n-                            {\n-                                Log.RemovingConsumer(s_logger, consumer.Name);\n-\n-                                if (_consumers.TryRemove(consumer.Name, out consumer))\n-                                {\n-                                    consumer.Dispose();\n-                                }\n-                            }\n-\n-                            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n-                            {\n-                                removedTask?.Dispose();\n-                            }\n-\n-                            stoppingConsumer.Dispose();\n-                        }\n-                        catch (AggregateException ae)\n-                        {\n-                            ae.Handle(ex =>\n-                            {\n-                                Log.ErrorOnConsumer(s_logger, ex);\n-                                return true;\n-                            });\n-                        }\n-                    }\n-\n-                    State = DispatcherState.DS_STOPPED;\n-                    Log.DispatcherStopped(s_logger);\n+                        Log.ErrorOnConsumer(s_logger, ex);\n+                        return true;\n+                    });\n                 }\n-            },\n-            CancellationToken.None,\n-            TaskCreationOptions.LongRunning,\n-            TaskScheduler.Default);\n+            }\n+        }\n+\n+        private void HandleNextStoppedPerformer()\n+        {\n+            var runningTasks = _tasks.Values.ToArray();\n+            var index = Task.WaitAny(runningTasks);\n+            var stoppingConsumer = runningTasks[index];\n+            Log.PerformerStopped(s_logger, stoppingConsumer.Status);\n+\n+            RemoveConsumerForTask(stoppingConsumer);\n+\n+            if (_tasks.TryRemove(stoppingConsumer.Id, out var removedTask))\n+            {\n+                removedTask?.Dispose();\n+            }\n+\n+            stoppingConsumer.Dispose();\n+        }\n+\n+        private void RemoveConsumerForTask(Task stoppingConsumer)\n+        {\n+            var consumer = Consumers.SingleOrDefault(c => c.JobId == stoppingConsumer.Id);\n+            if (consumer is null)\n+                return;\n+\n+            Log.RemovingConsumer(s_logger, consumer.Name);\n \n-            while (State != DispatcherState.DS_RUNNING)\n+            if (_consumers.TryRemove(consumer.Name, out consumer))\n             {\n-                Thread.Sleep(100); //Block main Dispatcher thread whilst control plane starts\n+                consumer.Dispose();\n             }\n","improvement-type":"Complex Method"}],"change-level":"warning","is-hotspot?":false,"line":92,"what-changed":"CreateMessageMetadata has a cyclomatic complexity of 10, threshold = 9","how-to-fix":"There are many reasons for Complex Method. Sometimes, another design approach is beneficial such as a) modeling state using an explicit state machine rather than conditionals, or b) using table lookup rather than long chains of logic. In other scenarios, the function can be split using [EXTRACT FUNCTION](https://refactoring.com/catalog/extractFunction.html). Just make sure you extract natural and cohesive functions. Complex Methods can also be addressed by identifying complex conditional expressions and then using the [DECOMPOSE CONDITIONAL](https://refactoring.com/catalog/decomposeConditional.html) refactoring.","change-type":"introduced"},{"why-it-occurs":"Duplicated code often leads to code that's harder to change since the same logical change has to be done in multiple functions. More duplication gives lower code health.","name":"Code Duplication","file":"src/Paramore.Brighter.MessagingGateway.Pulsar/PulsarMessageConsumer.cs","refactoring-examples":null,"change-level":"warning","is-hotspot?":false,"line":51,"what-changed":"The module contains 3 functions with similar structure: AcknowledgeAsync,RejectAsync,RequeueAsync","how-to-fix":"A certain degree of duplicated code might be acceptable. The problems start when it is the same behavior that is duplicated across the functions in the module, ie. a violation of the Don't Repeat Yourself (DRY) principle. DRY violations lead to code that is changed together in predictable patterns, which is both expensive and risky. DRY violations can be identified using CodeScene's X-Ray analysis to detect clusters of change coupled functions with high code similarity. [Read More](https://codescene.com/blog/software-revolution-part3/)\n\nOnce you have identified the similarities across functions, look to extract and encapsulate the concept that varies into its own function(s). These shared abstractions can then be re-used, which minimizes the amount of duplication and simplifies change.","change-type":"introduced"}]},"positive-impact-count":0,"repo":"Brighter","code-health":9.357123597174906,"version":"3.0","authors":["Rafael Lillo","Rafael Andrade"],"directives":{"added":[],"removed":[]},"positive-findings":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"notices":{"number-of-types":0,"number-of-files-touched":0,"findings":[]},"external-review-provider":"GitHub"},"analysistime":"2025-12-03T09:54:44.000Z","project-name":"Brighter","repository":"https://github.com/BrighterCommand/Brighter.git"}}