diff --git a/src/net/KNet/Specific/Consumer/ConsumerRecord.cs b/src/net/KNet/Specific/Consumer/ConsumerRecord.cs index 9d4027616b..b3d9a6c559 100644 --- a/src/net/KNet/Specific/Consumer/ConsumerRecord.cs +++ b/src/net/KNet/Specific/Consumer/ConsumerRecord.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Consumer /// The value type /// The JVM type of /// The JVM type of - public class ConsumerRecord : IGenericSerDesFactoryApplier, IDisposable + public class ConsumerRecord : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { IDeserializer _keyDeserializer; IDeserializer _valueDeserializer; @@ -65,6 +65,8 @@ internal ConsumerRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord + public Org.Apache.Kafka.Clients.Consumer.ConsumerRecord InnerReference => _record; volatile int _disposed; // 0 = live, 1 = disposed /// diff --git a/src/net/KNet/Specific/Consumer/ConsumerRecords.cs b/src/net/KNet/Specific/Consumer/ConsumerRecords.cs index 64cf615e9d..e34fa06417 100644 --- a/src/net/KNet/Specific/Consumer/ConsumerRecords.cs +++ b/src/net/KNet/Specific/Consumer/ConsumerRecords.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Consumer /// The value type /// The JVM type of /// The JVM type of - public class ConsumerRecords : IEnumerable>, IAsyncEnumerable>, IDisposable + public class ConsumerRecords : IKNetInnerReference>, IEnumerable>, IAsyncEnumerable>, IDisposable { readonly ISerDes _keyDeserializer; readonly ISerDes _valueDeserializer; @@ -49,6 +49,9 @@ internal ConsumerRecords(Org.Apache.Kafka.Clients.Consumer.ConsumerRecords + public Org.Apache.Kafka.Clients.Consumer.ConsumerRecords InnerReference => _records; + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed diff --git a/src/net/KNet/Specific/Consumer/ConsumerRecordsEnumerator.cs b/src/net/KNet/Specific/Consumer/ConsumerRecordsEnumerator.cs index bf9ef223a2..d32e07fa27 100644 --- a/src/net/KNet/Specific/Consumer/ConsumerRecordsEnumerator.cs +++ b/src/net/KNet/Specific/Consumer/ConsumerRecordsEnumerator.cs @@ -24,7 +24,7 @@ namespace MASES.KNet.Consumer { - class ConsumerRecordsEnumerator : IEnumerator>, IAsyncEnumerator> + class ConsumerRecordsEnumerator : IKNetInnerReference>, IEnumerator>, IAsyncEnumerator> { readonly IDeserializer _keyDeserializer; readonly IDeserializer _valueDeserializer; @@ -50,6 +50,9 @@ public ConsumerRecordsEnumerator(Org.Apache.Kafka.Clients.Consumer.ConsumerRecor _cancellationToken = cancellationToken; } + /// + public Org.Apache.Kafka.Clients.Consumer.ConsumerRecords InnerReference => _records; + ConsumerRecord IAsyncEnumerator>.Current => new ConsumerRecord(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer, false); ConsumerRecord IEnumerator>.Current => new ConsumerRecord(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer, false); diff --git a/src/net/KNet/Specific/IKNetInnerReference.cs b/src/net/KNet/Specific/IKNetInnerReference.cs new file mode 100644 index 0000000000..c3f4ad82ec --- /dev/null +++ b/src/net/KNet/Specific/IKNetInnerReference.cs @@ -0,0 +1,38 @@ +/* +* Copyright (c) 2021-2026 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace MASES.KNet +{ + /// + /// Defines a way to retrieve the underlying inner reference used from KNet object instance + /// + /// The inner reference type + public interface IKNetInnerReference + { + /// + /// The underlying inner reference used from KNet object instance + /// + T InnerReference { get; } + } +} diff --git a/src/net/KNet/Specific/Streams/KNetStreams.cs b/src/net/KNet/Specific/Streams/KNetStreams.cs index ea4269e277..6d4ac3836e 100644 --- a/src/net/KNet/Specific/Streams/KNetStreams.cs +++ b/src/net/KNet/Specific/Streams/KNetStreams.cs @@ -28,7 +28,7 @@ namespace MASES.KNet.Streams /// /// KNet extension of /// - public class KNetStreams : IGenericSerDesFactoryApplier, IDisposable + public class KNetStreams : IKNetInnerReference, IGenericSerDesFactoryApplier, IDisposable { readonly Java.Util.Properties _properties; readonly Org.Apache.Kafka.Streams.KafkaStreams _inner; @@ -94,6 +94,9 @@ public KNetStreams(Topology arg0, StreamsConfigBuilder arg1) } #endregion + /// + public Org.Apache.Kafka.Streams.KafkaStreams InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/KeyValue.cs b/src/net/KNet/Specific/Streams/KeyValue.cs index a9c08f4d2c..3622d64591 100644 --- a/src/net/KNet/Specific/Streams/KeyValue.cs +++ b/src/net/KNet/Specific/Streams/KeyValue.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams /// The value type /// The JVM type of /// The JVM type of - public sealed class KeyValue : IGenericSerDesFactoryApplier, IDisposable + public sealed class KeyValue : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly KeyValueSupport _inner = null; K _key; @@ -68,6 +68,9 @@ internal KeyValue(IGenericSerDesFactory factory, } } + /// + public KeyValueSupport InnerReference => _inner; + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Branched.cs b/src/net/KNet/Specific/Streams/Kstream/Branched.cs index 8386fc7ca4..d7af56fa4a 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Branched.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Branched.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class Branched : IGenericSerDesFactoryApplier, IDisposable + public class Branched : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Branched _inner; IGenericSerDesFactory _factory; @@ -42,6 +42,9 @@ public class Branched : IGenericSerDesFactoryApplier, IDispo _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Branched InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/CogroupedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/CogroupedKStream.cs index 521852ca4c..96abfe1450 100644 --- a/src/net/KNet/Specific/Streams/Kstream/CogroupedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/CogroupedKStream.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class CogroupedKStream : IGenericSerDesFactoryApplier, IDisposable + public class CogroupedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.CogroupedKStream _inner; @@ -43,6 +43,9 @@ internal CogroupedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Stream _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.CogroupedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Consumed.cs b/src/net/KNet/Specific/Streams/Kstream/Consumed.cs index df1111e4c2..6dac38d1b7 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Consumed.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Consumed.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class Consumed : IGenericSerDesFactoryApplier, IDisposable + public class Consumed : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { ISerDes _keySerdes; ISerDes _valueSerdes; @@ -54,6 +54,9 @@ IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Consumed InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/GlobalKTable.cs b/src/net/KNet/Specific/Streams/Kstream/GlobalKTable.cs index 08eaa15ddc..7c1dbe6663 100644 --- a/src/net/KNet/Specific/Streams/Kstream/GlobalKTable.cs +++ b/src/net/KNet/Specific/Streams/Kstream/GlobalKTable.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class GlobalKTable : IGenericSerDesFactoryApplier, IDisposable + public class GlobalKTable : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.GlobalKTable _inner; @@ -42,6 +42,9 @@ internal GlobalKTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Ks _inner = table; } + /// + public Org.Apache.Kafka.Streams.Kstream.GlobalKTable InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Grouped.cs b/src/net/KNet/Specific/Streams/Kstream/Grouped.cs index 0b28c139c5..2cd07d2143 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Grouped.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Grouped.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// The value type /// The JVM key typ /// The JVM value type - public class Grouped : IGenericSerDesFactoryApplier, IDisposable + public class Grouped : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Grouped _inner; IGenericSerDesFactory _factory; @@ -41,6 +41,9 @@ internal Grouped(Org.Apache.Kafka.Streams.Kstream.Grouped inner) _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Grouped InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Joined.cs b/src/net/KNet/Specific/Streams/Kstream/Joined.cs index 72bb6fa0f3..c7e61bc6f9 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Joined.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Joined.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream /// The JVM type of /// The JVM type of /// The JVM type of - public class Joined : IGenericSerDesFactoryApplier, IDisposable + public class Joined : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Joined _inner; IGenericSerDesFactory _factory; @@ -42,6 +42,9 @@ internal Joined(Org.Apache.Kafka.Streams.Kstream.Joined in _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Joined InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/KBranchedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/KBranchedKStream.cs index d38353ecb4..9074ab66ba 100644 --- a/src/net/KNet/Specific/Streams/Kstream/KBranchedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/KBranchedKStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class BranchedKStream : IGenericSerDesFactoryApplier, IDisposable + public class BranchedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.BranchedKStream _inner; @@ -42,6 +42,9 @@ internal BranchedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.BranchedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/KGroupedStream.cs b/src/net/KNet/Specific/Streams/Kstream/KGroupedStream.cs index 40dea31568..166f4932e5 100644 --- a/src/net/KNet/Specific/Streams/Kstream/KGroupedStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/KGroupedStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class KGroupedStream : IGenericSerDesFactoryApplier, IDisposable + public class KGroupedStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.KGroupedStream _inner; @@ -42,6 +42,9 @@ internal KGroupedStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams. _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.KGroupedStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/KGroupedTable.cs b/src/net/KNet/Specific/Streams/Kstream/KGroupedTable.cs index 9ee84e5008..1e76be3cba 100644 --- a/src/net/KNet/Specific/Streams/Kstream/KGroupedTable.cs +++ b/src/net/KNet/Specific/Streams/Kstream/KGroupedTable.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class KGroupedTable : IGenericSerDesFactoryApplier, IDisposable + public class KGroupedTable : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.KGroupedTable _inner; @@ -42,9 +42,12 @@ internal KGroupedTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.K _inner = inner; } - #region IDisposable + /// + public Org.Apache.Kafka.Streams.Kstream.KGroupedTable InnerReference => _inner; - volatile int _disposed; // 0 = live, 1 = disposed + #region IDisposable + + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed /// diff --git a/src/net/KNet/Specific/Streams/Kstream/KStream.cs b/src/net/KNet/Specific/Streams/Kstream/KStream.cs index 43247b1af5..3ab2f19eae 100644 --- a/src/net/KNet/Specific/Streams/Kstream/KStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/KStream.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class KStream : IGenericSerDesFactoryApplier, IDisposable + public class KStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.KStream _inner; @@ -43,6 +43,9 @@ internal KStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstream _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.KStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/KTable.cs b/src/net/KNet/Specific/Streams/Kstream/KTable.cs index aaea4b5580..0e5ac0ee71 100644 --- a/src/net/KNet/Specific/Streams/Kstream/KTable.cs +++ b/src/net/KNet/Specific/Streams/Kstream/KTable.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class KTable : IGenericSerDesFactoryApplier, IDisposable + public class KTable : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.KTable _inner; @@ -43,6 +43,9 @@ internal KTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstream. _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.KTable InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Printed.cs b/src/net/KNet/Specific/Streams/Kstream/Printed.cs index 6f6011c380..6ff1c5e8f3 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Printed.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Printed.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class Printed : IGenericSerDesFactoryApplier, IDisposable + public class Printed : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Printed _inner; IGenericSerDesFactory _factory; @@ -40,6 +40,9 @@ internal Printed(Org.Apache.Kafka.Streams.Kstream.Printed inner) _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Printed InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Produced.cs b/src/net/KNet/Specific/Streams/Kstream/Produced.cs index a1c9707128..b8e728b3a8 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Produced.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Produced.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class Produced : IGenericSerDesFactoryApplier, IDisposable + public class Produced : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { StreamPartitioner _streamPartitioner = null; readonly Org.Apache.Kafka.Streams.Kstream.Produced _inner; @@ -51,6 +51,9 @@ internal Produced(Org.Apache.Kafka.Streams.Kstream.Produced inner, _streamPartitioner = streamPartitioner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Produced InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Repartitioned.cs b/src/net/KNet/Specific/Streams/Kstream/Repartitioned.cs index c70911c918..407a44df19 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Repartitioned.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Repartitioned.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class Repartitioned : IGenericSerDesFactoryApplier, IDisposable + public class Repartitioned : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { StreamPartitioner _streamPartitioner = null; readonly Org.Apache.Kafka.Streams.Kstream.Repartitioned _inner; @@ -52,6 +52,9 @@ IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory _streamPartitioner = streamPartitioner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Repartitioned InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/SessionWindowedCogroupedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/SessionWindowedCogroupedKStream.cs index 4a3a32d6c8..fcfa3f4e2b 100644 --- a/src/net/KNet/Specific/Streams/Kstream/SessionWindowedCogroupedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/SessionWindowedCogroupedKStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class SessionWindowedCogroupedKStream : IGenericSerDesFactoryApplier, IDisposable + public class SessionWindowedCogroupedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.SessionWindowedCogroupedKStream _inner; @@ -42,6 +42,9 @@ internal SessionWindowedCogroupedKStream(IGenericSerDesFactory factory, Org.Apac _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.SessionWindowedCogroupedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/SessionWindowedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/SessionWindowedKStream.cs index 664bcf1a7f..f1442280fb 100644 --- a/src/net/KNet/Specific/Streams/Kstream/SessionWindowedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/SessionWindowedKStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class SessionWindowedKStream : IGenericSerDesFactoryApplier, IDisposable + public class SessionWindowedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.SessionWindowedKStream _inner; @@ -42,6 +42,9 @@ internal SessionWindowedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka. _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.SessionWindowedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/StreamJoined.cs b/src/net/KNet/Specific/Streams/Kstream/StreamJoined.cs index 7f0b1d698a..127cabdd2c 100644 --- a/src/net/KNet/Specific/Streams/Kstream/StreamJoined.cs +++ b/src/net/KNet/Specific/Streams/Kstream/StreamJoined.cs @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream /// The JVM type of /// The JVM type of /// The JVM type of - public class StreamJoined : IGenericSerDesFactoryApplier, IDisposable + public class StreamJoined : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.StreamJoined _inner; IGenericSerDesFactory _factory; @@ -42,6 +42,9 @@ public class StreamJoined : IGenericSerDesFact _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.StreamJoined InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Suppressed.cs b/src/net/KNet/Specific/Streams/Kstream/Suppressed.cs index c95cc6c610..6c6b59cdac 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Suppressed.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Suppressed.cs @@ -27,7 +27,7 @@ namespace MASES.KNet.Streams.Kstream /// /// /// The JVM type of - public class Suppressed : IGenericSerDesFactoryApplier, IDisposable + public class Suppressed : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Suppressed _inner; IGenericSerDesFactory _factory; @@ -38,6 +38,9 @@ internal Suppressed(Org.Apache.Kafka.Streams.Kstream.Suppressed inner) _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.Suppressed InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/TableJoined.cs b/src/net/KNet/Specific/Streams/Kstream/TableJoined.cs index 3af35c1f9b..934638334e 100644 --- a/src/net/KNet/Specific/Streams/Kstream/TableJoined.cs +++ b/src/net/KNet/Specific/Streams/Kstream/TableJoined.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class TableJoined : IGenericSerDesFactoryApplier, IDisposable + public class TableJoined : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.TableJoined _inner; IGenericSerDesFactory _factory; @@ -41,6 +41,9 @@ internal TableJoined(Org.Apache.Kafka.Streams.Kstream.TableJoined _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.TableJoined InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/TimeWindowedCogroupedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/TimeWindowedCogroupedKStream.cs index 5070065c62..0923632d44 100644 --- a/src/net/KNet/Specific/Streams/Kstream/TimeWindowedCogroupedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/TimeWindowedCogroupedKStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class TimeWindowedCogroupedKStream : IGenericSerDesFactoryApplier, IDisposable + public class TimeWindowedCogroupedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.TimeWindowedCogroupedKStream _inner; @@ -42,6 +42,9 @@ internal TimeWindowedCogroupedKStream(IGenericSerDesFactory factory, Org.Apache. _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.TimeWindowedCogroupedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/TimeWindowedKStream.cs b/src/net/KNet/Specific/Streams/Kstream/TimeWindowedKStream.cs index 8ddb18488f..42d9fa4999 100644 --- a/src/net/KNet/Specific/Streams/Kstream/TimeWindowedKStream.cs +++ b/src/net/KNet/Specific/Streams/Kstream/TimeWindowedKStream.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The JVM type of /// The JVM type of - public class TimeWindowedKStream : IGenericSerDesFactoryApplier, IDisposable + public class TimeWindowedKStream : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.Kstream.TimeWindowedKStream _inner; @@ -42,6 +42,9 @@ internal TimeWindowedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Str _inner = inner; } + /// + public Org.Apache.Kafka.Streams.Kstream.TimeWindowedKStream InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Kstream/Windowed.cs b/src/net/KNet/Specific/Streams/Kstream/Windowed.cs index c38b15834c..026e60cf1f 100644 --- a/src/net/KNet/Specific/Streams/Kstream/Windowed.cs +++ b/src/net/KNet/Specific/Streams/Kstream/Windowed.cs @@ -27,7 +27,7 @@ namespace MASES.KNet.Streams.Kstream /// /// The key type /// The JVM type of - public class Windowed : IGenericSerDesFactoryApplier, IDisposable + public class Windowed : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Kstream.Windowed _inner; ISerDes _keySerDes = null; @@ -40,6 +40,9 @@ internal Windowed(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstrea _inner = windowed; } + /// + public Org.Apache.Kafka.Streams.Kstream.Windowed InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/Processor/Api/ProcessorContext.cs b/src/net/KNet/Specific/Streams/Processor/Api/ProcessorContext.cs index f3a47ba6cb..9e33814efb 100644 --- a/src/net/KNet/Specific/Streams/Processor/Api/ProcessorContext.cs +++ b/src/net/KNet/Specific/Streams/Processor/Api/ProcessorContext.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Processor.Api /// /// The JVM type of /// The JVM type of - public class ProcessorContext : IDisposable + public class ProcessorContext : IKNetInnerReference>, IDisposable { readonly Org.Apache.Kafka.Streams.Processor.Api.ProcessorContext _context; @@ -38,9 +38,12 @@ internal ProcessorContext(Org.Apache.Kafka.Streams.Processor.Api.ProcessorContex _context = context; } - #region IDisposable + /// + public Org.Apache.Kafka.Streams.Processor.Api.ProcessorContext InnerReference => _context; - volatile int _disposed; // 0 = live, 1 = disposed + #region IDisposable + + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed /// diff --git a/src/net/KNet/Specific/Streams/Processor/Api/Record.cs b/src/net/KNet/Specific/Streams/Processor/Api/Record.cs index 1488c019e3..dc9bdff940 100644 --- a/src/net/KNet/Specific/Streams/Processor/Api/Record.cs +++ b/src/net/KNet/Specific/Streams/Processor/Api/Record.cs @@ -30,8 +30,12 @@ namespace MASES.KNet.Streams.Processor.Api /// The value type /// The JVM type of /// The JVM type of - public class Record : IDisposable + public class Record : IKNetInnerReference>, IDisposable { + readonly IGenericSerDesFactory _builder; + readonly Org.Apache.Kafka.Streams.Processor.Api.Record _record; + readonly Org.Apache.Kafka.Streams.Processor.Api.RecordMetadata _metadata; + internal Record(IGenericSerDesFactory builder, Org.Apache.Kafka.Streams.Processor.Api.Record record, Org.Apache.Kafka.Streams.Processor.Api.RecordMetadata metadata) { _builder = builder; @@ -39,9 +43,8 @@ internal Record(IGenericSerDesFactory builder, Org.Apache.Kafka.Streams.Processo _metadata = metadata; } - readonly IGenericSerDesFactory _builder; - readonly Org.Apache.Kafka.Streams.Processor.Api.Record _record; - readonly Org.Apache.Kafka.Streams.Processor.Api.RecordMetadata _metadata; + /// + public Org.Apache.Kafka.Streams.Processor.Api.Record InnerReference => _record; #region IDisposable diff --git a/src/net/KNet/Specific/Streams/State/KeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/KeyValueIterator.cs index 715f57752e..38249ffc1b 100644 --- a/src/net/KNet/Specific/Streams/State/KeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/KeyValueIterator.cs @@ -33,7 +33,7 @@ namespace MASES.KNet.Streams.State /// The value type /// The JVM type of /// The JVM type of - public sealed class KeyValueIterator : CommonIterator> + public sealed class KeyValueIterator : CommonIterator>, IKNetInnerReference> { #if NET7_0_OR_GREATER sealed class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, @@ -133,6 +133,9 @@ internal KeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafka.Stream _iterator = iterator; } + /// + public Org.Apache.Kafka.Streams.State.KeyValueIterator InnerReference => _iterator; + /// protected override void Dispose(bool disposing) { diff --git a/src/net/KNet/Specific/Streams/State/TimestampedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/TimestampedKeyValueIterator.cs index a243d87924..743c16f442 100644 --- a/src/net/KNet/Specific/Streams/State/TimestampedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/TimestampedKeyValueIterator.cs @@ -33,7 +33,7 @@ namespace MASES.KNet.Streams.State /// The value type /// The JVM type of /// The JVM type of - public sealed class TimestampedKeyValueIterator : CommonIterator> + public sealed class TimestampedKeyValueIterator : CommonIterator>, IKNetInnerReference>> { #if NET7_0_OR_GREATER sealed class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, @@ -125,6 +125,9 @@ internal TimestampedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.K _iterator = iterator; } + /// + public Org.Apache.Kafka.Streams.State.KeyValueIterator> InnerReference => _iterator; + /// protected override void Dispose(bool disposing) { diff --git a/src/net/KNet/Specific/Streams/State/TimestampedWindowedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/TimestampedWindowedKeyValueIterator.cs index c4b96160c1..f6f9a8130a 100644 --- a/src/net/KNet/Specific/Streams/State/TimestampedWindowedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/TimestampedWindowedKeyValueIterator.cs @@ -34,7 +34,7 @@ namespace MASES.KNet.Streams.State /// The value type /// The JVM type of /// The JVM type of - public sealed class TimestampedWindowedKeyValueIterator : CommonIterator> + public sealed class TimestampedWindowedKeyValueIterator : CommonIterator>, IKNetInnerReference, Org.Apache.Kafka.Streams.State.ValueAndTimestamp>> { #if NET7_0_OR_GREATER sealed class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, @@ -117,6 +117,9 @@ internal TimestampedWindowedKeyValueIterator(IGenericSerDesFactory factory, Org. _iterator = iterator; } + /// + public Org.Apache.Kafka.Streams.State.KeyValueIterator, Org.Apache.Kafka.Streams.State.ValueAndTimestamp> InnerReference => _iterator; + /// protected override void Dispose(bool disposing) { diff --git a/src/net/KNet/Specific/Streams/State/ValueAndTimestamp.cs b/src/net/KNet/Specific/Streams/State/ValueAndTimestamp.cs index e393d2cf98..31299c577c 100644 --- a/src/net/KNet/Specific/Streams/State/ValueAndTimestamp.cs +++ b/src/net/KNet/Specific/Streams/State/ValueAndTimestamp.cs @@ -29,9 +29,9 @@ namespace MASES.KNet.Streams.State /// /// The value type /// The JVM type of - public class ValueAndTimestamp : IGenericSerDesFactoryApplier, IDisposable + public class ValueAndTimestamp : IKNetInnerReference>, IGenericSerDesFactoryApplier, IDisposable { - readonly Org.Apache.Kafka.Streams.State.ValueAndTimestamp _valueAndTimestamp; + readonly Org.Apache.Kafka.Streams.State.ValueAndTimestamp _inner; ISerDes _valueSerDes; IGenericSerDesFactory _factory; IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set => _factory = value; } @@ -39,9 +39,12 @@ public class ValueAndTimestamp : IGenericSerDesFactoryApplier, IDispos internal ValueAndTimestamp(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.State.ValueAndTimestamp valueAndTimestamp) { _factory = factory; - _valueAndTimestamp = valueAndTimestamp; + _inner = valueAndTimestamp; } + /// + public Org.Apache.Kafka.Streams.State.ValueAndTimestamp InnerReference => _inner; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed @@ -70,7 +73,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { - _valueAndTimestamp?.Dispose(); + _inner?.Dispose(); } } @@ -80,7 +83,7 @@ protected virtual void Dispose(bool disposing) /// /// /// - public long Timestamp { get { CheckDisposed(); return _valueAndTimestamp.Timestamp(); } } + public long Timestamp { get { CheckDisposed(); return _inner.Timestamp(); } } /// /// /// @@ -96,7 +99,7 @@ public V Value { CheckDisposed(); _valueSerDes ??= _factory?.BuildKeySerDes(); - var vv = _valueAndTimestamp.Value(); + var vv = _inner.Value(); using var disposable0 = vv as IDisposable; return _valueSerDes.Deserialize(null, vv); diff --git a/src/net/KNet/Specific/Streams/State/WindowedKeyValueIterator.cs b/src/net/KNet/Specific/Streams/State/WindowedKeyValueIterator.cs index dd6d219e5b..9abfd99394 100644 --- a/src/net/KNet/Specific/Streams/State/WindowedKeyValueIterator.cs +++ b/src/net/KNet/Specific/Streams/State/WindowedKeyValueIterator.cs @@ -34,7 +34,7 @@ namespace MASES.KNet.Streams.State /// The value type /// The JVM type of /// The JVM type of - public sealed class WindowedKeyValueIterator : CommonIterator> + public sealed class WindowedKeyValueIterator : CommonIterator>, IKNetInnerReference, TJVMV>> { #if NET7_0_OR_GREATER sealed class PrefetchableLocalEnumerator(IGenericSerDesFactory factory, @@ -127,6 +127,9 @@ internal WindowedKeyValueIterator(IGenericSerDesFactory factory, Org.Apache.Kafk _iterator = iterator; } + /// + public Org.Apache.Kafka.Streams.State.KeyValueIterator, TJVMV> InnerReference => _iterator; + /// protected override void Dispose(bool disposing) { diff --git a/src/net/KNet/Specific/Streams/StreamsBuilder.cs b/src/net/KNet/Specific/Streams/StreamsBuilder.cs index 42bc5cee51..e340666a2f 100644 --- a/src/net/KNet/Specific/Streams/StreamsBuilder.cs +++ b/src/net/KNet/Specific/Streams/StreamsBuilder.cs @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams /// /// KNet extension of /// - public class StreamsBuilder : IGenericSerDesFactoryApplier, IDisposable + public class StreamsBuilder : IKNetInnerReference, IGenericSerDesFactoryApplier, IDisposable { Org.Apache.Kafka.Streams.StreamsBuilder _builder; IGenericSerDesFactory _factory; @@ -55,6 +55,9 @@ public StreamsBuilder(TopologyConfig arg0) #endregion + /// + public Org.Apache.Kafka.Streams.StreamsBuilder InnerReference => _builder; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/TimestampedKeyValue.cs b/src/net/KNet/Specific/Streams/TimestampedKeyValue.cs index 1f677848bf..7bb6ddb07c 100644 --- a/src/net/KNet/Specific/Streams/TimestampedKeyValue.cs +++ b/src/net/KNet/Specific/Streams/TimestampedKeyValue.cs @@ -19,6 +19,7 @@ using MASES.KNet.Serialization; using MASES.KNet.Streams.Processor.Api; using MASES.KNet.Streams.State; +using Org.Apache.Kafka.Streams.State; using System; using System.Threading; @@ -31,7 +32,7 @@ namespace MASES.KNet.Streams /// The value type /// The JVM type of /// The JVM type of - public sealed class TimestampedKeyValue : IGenericSerDesFactoryApplier, IDisposable + public sealed class TimestampedKeyValue : IKNetInnerReference>>, IGenericSerDesFactoryApplier, IDisposable { readonly KeyValueSupport> _inner = null; @@ -60,6 +61,9 @@ internal TimestampedKeyValue(IGenericSerDesFactory factory, } } + /// + public KeyValueSupport> InnerReference => _inner; + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed diff --git a/src/net/KNet/Specific/Streams/TimestampedWindowedKeyValue.cs b/src/net/KNet/Specific/Streams/TimestampedWindowedKeyValue.cs index eb81c88ad2..33e47381fb 100644 --- a/src/net/KNet/Specific/Streams/TimestampedWindowedKeyValue.cs +++ b/src/net/KNet/Specific/Streams/TimestampedWindowedKeyValue.cs @@ -32,7 +32,7 @@ namespace MASES.KNet.Streams /// The value type /// The JVM type of /// The JVM type of - public sealed class TimestampedWindowedKeyValue : IGenericSerDesFactoryApplier, IDisposable + public sealed class TimestampedWindowedKeyValue : IKNetInnerReference, Org.Apache.Kafka.Streams.State.ValueAndTimestamp>>, IGenericSerDesFactoryApplier, IDisposable { readonly KeyValueSupport, Org.Apache.Kafka.Streams.State.ValueAndTimestamp> _valueInner; Windowed _key = null; @@ -47,6 +47,9 @@ internal TimestampedWindowedKeyValue(IGenericSerDesFactory factory, _valueInner = value; } + /// + public KeyValueSupport, Org.Apache.Kafka.Streams.State.ValueAndTimestamp> InnerReference => _valueInner; + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed diff --git a/src/net/KNet/Specific/Streams/Topology.cs b/src/net/KNet/Specific/Streams/Topology.cs index 22c12677d9..8430c20f81 100644 --- a/src/net/KNet/Specific/Streams/Topology.cs +++ b/src/net/KNet/Specific/Streams/Topology.cs @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams /// /// KNet implementation of /// - public class Topology : IGenericSerDesFactoryApplier, IDisposable + public class Topology : IKNetInnerReference, IGenericSerDesFactoryApplier, IDisposable { readonly Org.Apache.Kafka.Streams.Topology _topology; IGenericSerDesFactory _factory; @@ -55,6 +55,9 @@ internal Topology(Org.Apache.Kafka.Streams.Topology topology, IGenericSerDesFact #endregion + /// + public Org.Apache.Kafka.Streams.Topology InnerReference => _topology; + #region IDisposable volatile int _disposed; // 0 = live, 1 = disposed diff --git a/src/net/KNet/Specific/Streams/WindowedKeyValue.cs b/src/net/KNet/Specific/Streams/WindowedKeyValue.cs index 762258924d..160a9b8b76 100644 --- a/src/net/KNet/Specific/Streams/WindowedKeyValue.cs +++ b/src/net/KNet/Specific/Streams/WindowedKeyValue.cs @@ -19,6 +19,7 @@ using MASES.KNet.Serialization; using MASES.KNet.Streams.Kstream; using MASES.KNet.Streams.Processor.Api; +using Org.Apache.Kafka.Streams; using System; using System.Threading; @@ -31,7 +32,7 @@ namespace MASES.KNet.Streams /// The value type /// The JVM type of /// The JVM type of - public sealed class WindowedKeyValue : IGenericSerDesFactoryApplier, IDisposable + public sealed class WindowedKeyValue : IKNetInnerReference, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable { readonly KeyValueSupport, TJVMV> _valueInner; Windowed _key = null; @@ -57,6 +58,9 @@ internal WindowedKeyValue(IGenericSerDesFactory factory, } } + /// + public KeyValueSupport, TJVMV> InnerReference => _valueInner; + volatile int _disposed; // 0 = live, 1 = disposed /// /// Test if this instance was disposed