Skip to content

Commit 656e924

Browse files
claudevdmClaude
andauthored
Encode paneinfo with PaneInfoCoder. (#34824) (#34864)
* Encode paneinfo with PaneInfoCoder. * Fix tests. * Fix import. * Add typecoder test. * Implement eq and hash. --------- Co-authored-by: Claude <[email protected]>
1 parent e7098bd commit 656e924

File tree

4 files changed

+55
-1
lines changed

4 files changed

+55
-1
lines changed

sdks/python/apache_beam/coders/coders.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from apache_beam.portability.api import beam_runner_api_pb2
6565
from apache_beam.typehints import typehints
6666
from apache_beam.utils import proto_utils
67+
from apache_beam.utils import windowed_value
6768

6869
if TYPE_CHECKING:
6970
from apache_beam.coders.typecoders import CoderRegistry
@@ -113,7 +114,8 @@
113114
'WindowedValueCoder',
114115
'ParamWindowedValueCoder',
115116
'BigIntegerCoder',
116-
'DecimalCoder'
117+
'DecimalCoder',
118+
'PaneInfoCoder'
117119
]
118120

119121
T = TypeVar('T')
@@ -1753,6 +1755,24 @@ def __hash__(self):
17531755
return hash(type(self))
17541756

17551757

1758+
class PaneInfoCoder(FastCoder):
1759+
def _create_impl(self):
1760+
return coder_impl.PaneInfoCoderImpl()
1761+
1762+
def is_deterministic(self):
1763+
# type: () -> bool
1764+
return True
1765+
1766+
def to_type_hint(self):
1767+
return windowed_value.PaneInfo
1768+
1769+
def __eq__(self, other):
1770+
return type(self) == type(other)
1771+
1772+
def __hash__(self):
1773+
return hash(type(self))
1774+
1775+
17561776
class DecimalCoder(FastCoder):
17571777
def _create_impl(self):
17581778
return coder_impl.DecimalCoderImpl()

sdks/python/apache_beam/coders/coders_test_common.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,18 @@ def test_interval_window_coder(self):
362362
coders.TupleCoder((coders.IntervalWindowCoder(), )),
363363
(window.IntervalWindow(0, 10), ))
364364

365+
def test_paneinfo_window_coder(self):
366+
self.check_coder(
367+
coders.PaneInfoCoder(),
368+
*[
369+
windowed_value.PaneInfo(
370+
is_first=y == 0,
371+
is_last=y == 9,
372+
timing=windowed_value.PaneInfoTiming.EARLY,
373+
index=y,
374+
nonspeculative_index=-1) for y in range(0, 10)
375+
])
376+
365377
def test_timestamp_coder(self):
366378
self.check_coder(
367379
coders.TimestampCoder(),
@@ -539,6 +551,7 @@ def test_windowed_value_coder(self):
539551
def test_param_windowed_value_coder(self):
540552
from apache_beam.transforms.window import IntervalWindow
541553
from apache_beam.utils.windowed_value import PaneInfo
554+
# pylint: disable=too-many-function-args
542555
wv = windowed_value.create(
543556
b'',
544557
# Milliseconds to microseconds

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def MakeXyzs(v):
7373

7474
from apache_beam.coders import coders
7575
from apache_beam.typehints import typehints
76+
from apache_beam.utils import windowed_value
7677

7778
__all__ = ['registry']
7879

@@ -92,6 +93,7 @@ def register_standard_coders(self, fallback_coder):
9293
self._register_coder_internal(bytes, coders.BytesCoder)
9394
self._register_coder_internal(bool, coders.BooleanCoder)
9495
self._register_coder_internal(str, coders.StrUtf8Coder)
96+
self._register_coder_internal(windowed_value.PaneInfo, coders.PaneInfoCoder)
9597
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
9698
self._register_coder_internal(typehints.DictConstraint, coders.MapCoder)
9799
self._register_coder_internal(

sdks/python/apache_beam/coders/typecoders_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from apache_beam.coders import typecoders
2525
from apache_beam.internal import pickler
2626
from apache_beam.typehints import typehints
27+
from apache_beam.utils import windowed_value
2728

2829

2930
class CustomClass(object):
@@ -141,6 +142,24 @@ def test_nullable_coder(self):
141142
self.assertEqual(expected_coder.encode(None), real_coder.encode(None))
142143
self.assertEqual(expected_coder.encode(b'abc'), real_coder.encode(b'abc'))
143144

145+
def test_paneinfo_coder(self):
146+
expected_coder = coders.PaneInfoCoder()
147+
real_coder = typecoders.registry.get_coder(windowed_value.PaneInfo)
148+
self.assertEqual(expected_coder, real_coder)
149+
for i in range(10):
150+
pane_info = windowed_value.PaneInfo(
151+
is_first=i==0,
152+
is_last=i==9,
153+
timing=windowed_value.PaneInfoTiming.EARLY, # 0
154+
index=i,
155+
nonspeculative_index=-1
156+
)
157+
158+
encoded = real_coder.encode(pane_info)
159+
160+
self.assertEqual(expected_coder.encode(pane_info), encoded)
161+
self.assertEqual(pane_info, real_coder.decode(encoded))
162+
144163

145164
if __name__ == '__main__':
146165
unittest.main()

0 commit comments

Comments
 (0)