Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/net/KNet/Specific/Consumer/ConsumerRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Consumer
/// <typeparam name="V">The value type</typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class ConsumerRecord<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class ConsumerRecord<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Clients.Consumer.ConsumerRecord<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
IDeserializer<K, TJVMK> _keyDeserializer;
IDeserializer<V, TJVMV> _valueDeserializer;
Expand Down Expand Up @@ -65,6 +65,8 @@ internal ConsumerRecord(Org.Apache.Kafka.Clients.Consumer.ConsumerRecord<TJVMK,
_record = record;
_factory = factory;
}
/// <inheritdoc/>
public Org.Apache.Kafka.Clients.Consumer.ConsumerRecord<TJVMK, TJVMV> InnerReference => _record;

volatile int _disposed; // 0 = live, 1 = disposed
/// <summary>
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Consumer/ConsumerRecords.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace MASES.KNet.Consumer
/// <typeparam name="V">The value type</typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class ConsumerRecords<K, V, TJVMK, TJVMV> : IEnumerable<ConsumerRecord<K, V, TJVMK, TJVMV>>, IAsyncEnumerable<ConsumerRecord<K, V, TJVMK, TJVMV>>, IDisposable
public class ConsumerRecords<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<TJVMK, TJVMV>>, IEnumerable<ConsumerRecord<K, V, TJVMK, TJVMV>>, IAsyncEnumerable<ConsumerRecord<K, V, TJVMK, TJVMV>>, IDisposable
{
readonly ISerDes<K, TJVMK> _keyDeserializer;
readonly ISerDes<V, TJVMV> _valueDeserializer;
Expand All @@ -49,6 +49,9 @@ internal ConsumerRecords(Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<TJVMK
_valueDeserializer = valueDeserializer;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<TJVMK, TJVMV> InnerReference => _records;

volatile int _disposed; // 0 = live, 1 = disposed
/// <summary>
/// Test if this instance was disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Consumer/ConsumerRecordsEnumerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace MASES.KNet.Consumer
{
class ConsumerRecordsEnumerator<K, V, TJVMK, TJVMV> : IEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>, IAsyncEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>
class ConsumerRecordsEnumerator<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<TJVMK, TJVMV>>, IEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>, IAsyncEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>
{
readonly IDeserializer<K, TJVMK> _keyDeserializer;
readonly IDeserializer<V, TJVMV> _valueDeserializer;
Expand All @@ -50,6 +50,9 @@ public ConsumerRecordsEnumerator(Org.Apache.Kafka.Clients.Consumer.ConsumerRecor
_cancellationToken = cancellationToken;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<TJVMK, TJVMV> InnerReference => _records;

ConsumerRecord<K, V, TJVMK, TJVMV> IAsyncEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>.Current => new ConsumerRecord<K, V, TJVMK, TJVMV>(_recordAsyncEnumerator.Current, _keyDeserializer, _valueDeserializer, false);

ConsumerRecord<K, V, TJVMK, TJVMV> IEnumerator<ConsumerRecord<K, V, TJVMK, TJVMV>>.Current => new ConsumerRecord<K, V, TJVMK, TJVMV>(_recordEnumerator.Current, _keyDeserializer, _valueDeserializer, false);
Expand Down
38 changes: 38 additions & 0 deletions src/net/KNet/Specific/IKNetInnerReference.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021-2026 MASES s.r.l.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Refer to LICENSE for more information.
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MASES.KNet
{
/// <summary>
/// Defines a way to retrieve the underlying inner reference used from KNet object instance
/// </summary>
/// <typeparam name="T">The inner reference type</typeparam>
public interface IKNetInnerReference<T>
{
/// <summary>
/// The underlying inner reference used from KNet object instance
/// </summary>
T InnerReference { get; }
}
}
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/KNetStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace MASES.KNet.Streams
/// <summary>
/// KNet extension of <see cref="Org.Apache.Kafka.Streams.KafkaStreams"/>
/// </summary>
public class KNetStreams : IGenericSerDesFactoryApplier, IDisposable
public class KNetStreams : IKNetInnerReference<Org.Apache.Kafka.Streams.KafkaStreams>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Java.Util.Properties _properties;
readonly Org.Apache.Kafka.Streams.KafkaStreams _inner;
Expand Down Expand Up @@ -94,6 +94,9 @@ public KNetStreams(Topology arg0, StreamsConfigBuilder arg1)
}
#endregion

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.KafkaStreams InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams
/// <typeparam name="V">The value type</typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public sealed class KeyValue<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public sealed class KeyValue<K, V, TJVMK, TJVMV> : IKNetInnerReference<KeyValueSupport<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly KeyValueSupport<TJVMK, TJVMV> _inner = null;
K _key;
Expand Down Expand Up @@ -68,6 +68,9 @@ internal KeyValue(IGenericSerDesFactory factory,
}
}

/// <inheritdoc/>
public KeyValueSupport<TJVMK, TJVMV> InnerReference => _inner;

volatile int _disposed; // 0 = live, 1 = disposed
/// <summary>
/// Test if this instance was disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Branched.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class Branched<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class Branched<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Branched<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Org.Apache.Kafka.Streams.Kstream.Branched<TJVMK, TJVMV> _inner;
IGenericSerDesFactory _factory;
Expand All @@ -42,6 +42,9 @@ public class Branched<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDispo
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Branched<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/CogroupedKStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="VOut"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMVOut">The JVM type of <typeparamref name="VOut"/></typeparam>
public class CogroupedKStream<K, VOut, TJVMK, TJVMVOut> : IGenericSerDesFactoryApplier, IDisposable
public class CogroupedKStream<K, VOut, TJVMK, TJVMVOut> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.CogroupedKStream<TJVMK, TJVMVOut>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.CogroupedKStream<TJVMK, TJVMVOut> _inner;

Expand All @@ -43,6 +43,9 @@ internal CogroupedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Stream
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.CogroupedKStream<TJVMK, TJVMVOut> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Consumed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class Consumed<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class Consumed<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Consumed<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
ISerDes<K, TJVMK> _keySerdes;
ISerDes<V, TJVMV> _valueSerdes;
Expand All @@ -54,6 +54,9 @@ IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Consumed<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/GlobalKTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class GlobalKTable<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class GlobalKTable<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.GlobalKTable<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.GlobalKTable<TJVMK, TJVMV> _inner;

Expand All @@ -42,6 +42,9 @@ internal GlobalKTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Ks
_inner = table;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.GlobalKTable<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Grouped.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V">The value type</typeparam>
/// <typeparam name="TJVMK">The JVM key typ</typeparam>
/// <typeparam name="TJVMV">The JVM value type</typeparam>
public class Grouped<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class Grouped<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Grouped<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Org.Apache.Kafka.Streams.Kstream.Grouped<TJVMK, TJVMV> _inner;
IGenericSerDesFactory _factory;
Expand All @@ -41,6 +41,9 @@ internal Grouped(Org.Apache.Kafka.Streams.Kstream.Grouped<TJVMK, TJVMV> inner)
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Grouped<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Joined.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
/// <typeparam name="TJVMVO">The JVM type of <typeparamref name="VO"/></typeparam>
public class Joined<K, V, VO, TJVMK, TJVMV, TJVMVO> : IGenericSerDesFactoryApplier, IDisposable
public class Joined<K, V, VO, TJVMK, TJVMV, TJVMVO> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Joined<TJVMK, TJVMV, TJVMVO>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Org.Apache.Kafka.Streams.Kstream.Joined<TJVMK, TJVMV, TJVMVO> _inner;
IGenericSerDesFactory _factory;
Expand All @@ -42,6 +42,9 @@ internal Joined(Org.Apache.Kafka.Streams.Kstream.Joined<TJVMK, TJVMV, TJVMVO> in
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Joined<TJVMK, TJVMV, TJVMVO> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/KBranchedKStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class BranchedKStream<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class BranchedKStream<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.BranchedKStream<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.BranchedKStream<TJVMK, TJVMV> _inner;

Expand All @@ -42,6 +42,9 @@ internal BranchedKStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.BranchedKStream<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/KGroupedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class KGroupedStream<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class KGroupedStream<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.KGroupedStream<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.KGroupedStream<TJVMK, TJVMV> _inner;

Expand All @@ -42,6 +42,9 @@ internal KGroupedStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.KGroupedStream<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
9 changes: 6 additions & 3 deletions src/net/KNet/Specific/Streams/Kstream/KGroupedTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class KGroupedTable<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class KGroupedTable<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.KGroupedTable<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.KGroupedTable<TJVMK, TJVMV> _inner;

Expand All @@ -42,9 +42,12 @@ internal KGroupedTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.K
_inner = inner;
}

#region IDisposable
/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.KGroupedTable<TJVMK, TJVMV> InnerReference => _inner;

volatile int _disposed; // 0 = live, 1 = disposed
#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
/// <summary>
/// Test if this instance was disposed
/// </summary>
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/KStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class KStream<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class KStream<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.KStream<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
Org.Apache.Kafka.Streams.Kstream.KStream<TJVMK, TJVMV> _inner;

Expand All @@ -43,6 +43,9 @@ internal KStream(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstream
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.KStream<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/KTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class KTable<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class KTable<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.KTable<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Org.Apache.Kafka.Streams.Kstream.KTable<TJVMK, TJVMV> _inner;

Expand All @@ -43,6 +43,9 @@ internal KTable(IGenericSerDesFactory factory, Org.Apache.Kafka.Streams.Kstream.
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.KTable<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Printed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class Printed<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class Printed<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Printed<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
readonly Org.Apache.Kafka.Streams.Kstream.Printed<TJVMK, TJVMV> _inner;
IGenericSerDesFactory _factory;
Expand All @@ -40,6 +40,9 @@ internal Printed(Org.Apache.Kafka.Streams.Kstream.Printed<TJVMK, TJVMV> inner)
_inner = inner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Printed<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
5 changes: 4 additions & 1 deletion src/net/KNet/Specific/Streams/Kstream/Produced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace MASES.KNet.Streams.Kstream
/// <typeparam name="V"></typeparam>
/// <typeparam name="TJVMK">The JVM type of <typeparamref name="K"/></typeparam>
/// <typeparam name="TJVMV">The JVM type of <typeparamref name="V"/></typeparam>
public class Produced<K, V, TJVMK, TJVMV> : IGenericSerDesFactoryApplier, IDisposable
public class Produced<K, V, TJVMK, TJVMV> : IKNetInnerReference<Org.Apache.Kafka.Streams.Kstream.Produced<TJVMK, TJVMV>>, IGenericSerDesFactoryApplier, IDisposable
{
StreamPartitioner<K, V, TJVMK, TJVMV> _streamPartitioner = null;
readonly Org.Apache.Kafka.Streams.Kstream.Produced<TJVMK, TJVMV> _inner;
Expand All @@ -51,6 +51,9 @@ internal Produced(Org.Apache.Kafka.Streams.Kstream.Produced<TJVMK, TJVMV> inner,
_streamPartitioner = streamPartitioner;
}

/// <inheritdoc/>
public Org.Apache.Kafka.Streams.Kstream.Produced<TJVMK, TJVMV> InnerReference => _inner;

#region IDisposable

volatile int _disposed; // 0 = live, 1 = disposed
Expand Down
Loading
Loading