Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethinking external assets 2 #272

Merged
merged 24 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f188425
Implemented external asset structure
evalott100 Jun 15, 2023
463743d
Added Partial typeddicts for use with bluesky
evalott100 Jun 15, 2023
cd601d7
Added Partial typeddicts for use with bluesky
evalott100 Jun 15, 2023
2910dfe
Changed the event counters to increment by seq_num, not data. Changed…
evalott100 Jun 20, 2023
e5ec76c
Changed ComposeStreamResource to only have one ComposeDatum. Changed …
evalott100 Jul 3, 2023
78d95b4
Specifying pydantic<2 in pyproject.toml, also changed stream_resource…
evalott100 Jul 3, 2023
19e6744
Removed content from stream_datum, added a Range Typeddict
evalott100 Jul 3, 2023
6e8a865
Allowed external="STREAM:" keys to not be represented in data in Comp…
evalott100 Jul 4, 2023
959bc59
Fixed typo
evalott100 Sep 14, 2023
79daf00
Fixed imports to avoid pydantic if the version is >=2
evalott100 Jul 4, 2023
1ed861d
We changed != to == when checking datakeys
evalott100 Jul 12, 2023
e1e8065
Made corrections to changes, still need to add descriptor reference t…
evalott100 Jul 13, 2023
60848ee
Added a reference to event-descriptor uid in StreamDatum
evalott100 Jul 20, 2023
7f53948
Made run_start and descriptor required in Resource and StreamDatum re…
evalott100 Jul 20, 2023
f579225
removed block_idx from StreamDatum
evalott100 Aug 14, 2023
9ca7ca7
Made seq_nums optional
evalott100 Sep 11, 2023
a7e80da
Made data_keys a str data_key and moved it to stream resource
evalott100 Sep 14, 2023
0432aeb
Supported empty event pages
evalott100 Sep 28, 2023
6d3d8f2
Ran black (minor change)
evalott100 Sep 29, 2023
8a23344
Made stream_resource::run_start optional and fixed incorrect description
evalott100 Oct 3, 2023
ba9fbcf
Fix typo
coretl Oct 10, 2023
bd90c47
Merge remote-tracking branch 'eva/typo_correction' into rethinking_ex…
coretl Oct 10, 2023
a389141
Missed from the last merge
coretl Oct 10, 2023
e57e53b
Changed doc examples, made run_start NotRequired in resource/stream_r…
evalott100 Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 61 additions & 43 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,31 +2000,28 @@ def compose_resource(
@dataclass
class ComposeStreamDatum:
stream_resource: StreamResource
stream_name: str
counter: Iterator

def __call__(
self,
datum_kwargs: Dict[str, Any],
data_keys: List[str],
seq_nums: Dict[str, int],
indices: Dict[str, int],
event_count: int = 1,
event_offset: int = 0,
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
validate: bool = True,
) -> StreamDatum:
resource_uid = self.stream_resource["uid"]
if self.stream_name not in self.stream_resource["stream_names"]:
raise EventModelKeyError(
"Attempt to create stream_datum with name not included"
"in stream_resource"
)
block_idx = next(self.counter)
doc = StreamDatum(
stream_resource=resource_uid,
datum_kwargs=datum_kwargs,
uid=f"{resource_uid}/{self.stream_name}/{block_idx}",
stream_name=self.stream_name,
uid=f"{resource_uid}/{block_idx}",
block_idx=block_idx,
event_count=event_count,
event_offset=event_offset,
data_keys=data_keys,
seq_nums=seq_nums,
indices=indices,
)
if validate:
schema_validators[DocumentNames.stream_datum].validate(doc)
Expand All @@ -2035,18 +2032,21 @@ def __call__(
def compose_stream_datum(
*,
stream_resource: StreamResource,
stream_name: str,
counter: Iterator,
datum_kwargs: Dict[str, Any],
data_keys: List[str],
seq_nums: Dict[str, int],
indices: Dict[str, int],
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
event_count: int = 1,
event_offset: int = 0,
validate: bool = True,
) -> StreamDatum:
"""
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeStreamDatum(stream_resource, stream_name, counter)(
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
datum_kwargs,
return ComposeStreamDatum(stream_resource, counter)(
data_keys,
seq_nums,
indices,
event_count=event_count,
event_offset=event_offset,
validate=validate,
Expand All @@ -2056,7 +2056,7 @@ def compose_stream_datum(
@dataclass
class ComposeStreamResourceBundle:
stream_resource_doc: StreamResource
compose_stream_data: List[ComposeStreamDatum]
compose_stream_data: ComposeStreamDatum

# iter for backwards compatibility
def __iter__(self) -> Iterator:
Expand All @@ -2078,33 +2078,19 @@ def __call__(
root: str,
resource_path: str,
resource_kwargs: Dict[str, Any],
stream_names: Union[List, str],
counters: List = [],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
uid: Optional[str] = None,
validate: bool = True,
) -> ComposeStreamResourceBundle:
if uid is None:
uid = str(uuid.uuid4())
if isinstance(stream_names, str):
stream_names = [
stream_names,
]
if len(counters) == 0:
counters = [itertools.count() for _ in stream_names]
elif len(counters) > len(stream_names):
raise ValueError(
"Insufficient number of counters "
f"{len(counters)} for stream names: {stream_names}"
)

doc = StreamResource(
uid=uid,
spec=spec,
root=root,
resource_path=resource_path,
resource_kwargs=resource_kwargs,
stream_names=stream_names,
path_semantics=path_semantics,
)
if self.start:
Expand All @@ -2115,14 +2101,10 @@ def __call__(

return ComposeStreamResourceBundle(
doc,
[
ComposeStreamDatum(
stream_resource=doc,
stream_name=stream_name,
counter=counter,
)
for stream_name, counter in zip(stream_names, counters)
],
ComposeStreamDatum(
stream_resource=doc,
counter=itertools.count(),
)
)


Expand All @@ -2132,7 +2114,6 @@ def compose_stream_resource(
root: str,
resource_path: str,
resource_kwargs: Dict[str, Any],
stream_names: Union[List, str],
counters: List = [],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
start: Optional[RunStart] = None,
Expand All @@ -2147,7 +2128,6 @@ def compose_stream_resource(
root,
resource_path,
resource_kwargs,
stream_names,
counters=counters,
path_semantics=path_semantics,
uid=uid,
Expand Down Expand Up @@ -2213,6 +2193,20 @@ def compose_stop(
)(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate)


def dict_of_lists_has_equal_size_lists(dictionary: Dict[str, List]) -> bool:
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
"""Return True if all lists are the same size in a Dict[str, List]."""

dictionary_values = iter(dictionary.values())
first_element_len = len(next(dictionary_values))
next_element = next(dictionary_values, None)

while next_element:
if len(next_element) != first_element_len:
return False
next_element = next(dictionary_values, None)
return True


@dataclass
class ComposeEventPage:
descriptor: EventDescriptor
Expand All @@ -2222,12 +2216,30 @@ def __call__(
self,
data: Dict[str, List],
timestamps: Dict[str, Any],
seq_num: List[int],
seq_num: Optional[List[int]] = None,
filled: Optional[Dict[str, List[Union[bool, str]]]] = None,
uid: Optional[List] = None,
time: Optional[List] = None,
validate: bool = True,
) -> EventPage:
assert dict_of_lists_has_equal_size_lists(timestamps), (
"Cannot compose event_page: event_page contains `timestamps` "
"list values of different lengths"
)
assert dict_of_lists_has_equal_size_lists(data), (
"Cannot compose event_page: event_page contains `data` "
"lists of different lengths"
)
assert len(next(iter(timestamps.values()))) == len(next(iter(data.values()))), (
"Cannot compose event_page: the lists in `timestamps` are of a different "
"length to those in `data`"
)

if seq_num is None:
last_seq_num = self.event_counters[self.descriptor["name"]]
seq_num = list(
range(last_seq_num, len(next(iter(data.values()))) + last_seq_num)
)
N = len(seq_num)
if uid is None:
uid = [str(uuid.uuid4()) for _ in range(N)]
Expand Down Expand Up @@ -2264,7 +2276,7 @@ def __call__(
"Keys in event['filled'] {} must be a subset of those in "
"event['data'] {}".format(filled.keys(), data.keys())
)
self.event_counters[self.descriptor["name"]] += len(data)
self.event_counters[self.descriptor["name"]] += len(seq_num)
return doc


Expand All @@ -2284,7 +2296,13 @@ def compose_event_page(
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeEventPage(descriptor, event_counters)(
data, timestamps, seq_num, filled, uid=uid, time=time, validate=validate
data,
timestamps,
seq_num=seq_num,
filled=filled,
uid=uid,
time=time,
validate=validate,
)


Expand Down Expand Up @@ -2340,7 +2358,7 @@ def __call__(
"Keys in event['filled'] {} must be a subset of those in "
"event['data'] {}".format(filled.keys(), data.keys())
)
self.event_counters[self.descriptor["name"]] += 1
self.event_counters[self.descriptor["name"]] = seq_num + 1
return doc


Expand Down
1 change: 0 additions & 1 deletion event_model/documents/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class Event(PartialEvent):
descriptor: Annotated[
str, Field(description="UID of the EventDescriptor to which this Event belongs")
]

seq_num: Annotated[
int,
Field(
Expand Down
4 changes: 3 additions & 1 deletion event_model/documents/event_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from .generate.type_wrapper import Field, add_extra_schema

Dtype = Literal["string", "number", "array", "boolean", "integer"]


class DataKey(TypedDict):
"""Describes the objects in the data property of Event documents"""
Expand All @@ -18,7 +20,7 @@ class DataKey(TypedDict):
]
]
dtype: Annotated[
Literal["string", "number", "array", "boolean", "integer"],
Dtype,
Field(description="The type of the data in the event."),
]
external: NotRequired[
Expand Down
37 changes: 22 additions & 15 deletions event_model/documents/stream_datum.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Dict, List

from typing_extensions import Annotated, TypedDict

Expand All @@ -18,13 +18,6 @@ class StreamDatum(TypedDict):
"be contiguous for a given stream.",
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
),
]
datum_kwargs: Annotated[
Dict[str, Any],
Field(
description="Arguments to pass to the Handler to retrieve one "
"quanta of data",
),
]
event_count: Annotated[
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
int, Field(description="The number of events in this datum.")
]
Expand All @@ -35,13 +28,6 @@ class StreamDatum(TypedDict):
"increasing value allows the presence of gaps.",
),
]
stream_name: Annotated[
str,
Field(
description="The name of the stream that this Datum is providing a "
"block of.",
),
]
stream_resource: Annotated[
str,
Field(
Expand All @@ -55,3 +41,24 @@ class StreamDatum(TypedDict):
"formatting being '<stream_resource>/<stream_name>/<block_id>",
),
]
data_keys: Annotated[
List[str],
Field(
description="A list to show which data_keys of the "
"Descriptor are being streamed"
),
]
seq_nums: Annotated[
Dict[str, int],
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
Field(
description="A slice object showing the Event numbers the "
"resource corresponds to"
),
]
indices: Annotated[
Dict[str, int],
Field(
description="A slice object passed to the StreamResource "
"handler so it can hand back data and timestamps"
),
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
]
10 changes: 1 addition & 9 deletions event_model/documents/stream_resource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any, Dict

from typing_extensions import Annotated, Literal, NotRequired, TypedDict

Expand Down Expand Up @@ -55,11 +55,3 @@ class StreamResource(TypedDict):
uid: Annotated[
str, Field(description="Globally unique identifier for this Stream Resource")
]
stream_names: Annotated[
List[str],
Field(
description="List of the stream names this resource provides",
min_items=1,
unique_items=True,
),
]
35 changes: 25 additions & 10 deletions event_model/schemas/stream_datum.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
"description": "The order in the stream of this block of data. This must be contiguous for a given stream.",
"type": "integer"
},
"datum_kwargs": {
"title": "Datum Kwargs",
"description": "Arguments to pass to the Handler to retrieve one quanta of data",
"type": "object"
"data_keys": {
"title": "Data Keys",
"description": "A list to show which data_keys of the Descriptor are being streamed",
"type": "array",
"items": {
"type": "string"
}
},
"event_count": {
"title": "Event Count",
Expand All @@ -23,10 +26,21 @@
"description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps.",
"type": "integer"
},
"stream_name": {
"title": "Stream Name",
"description": "The name of the stream that this Datum is providing a block of.",
"type": "string"
"indices": {
"title": "Indices",
"description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps",
"type": "object",
"additionalProperties": {
"type": "integer"
}
},
"seq_nums": {
"title": "Seq Nums",
"description": "A slice object showing the Event numbers the resource corresponds to",
"type": "object",
"additionalProperties": {
"type": "integer"
}
},
"stream_resource": {
"title": "Stream Resource",
Expand All @@ -41,10 +55,11 @@
},
"required": [
"block_idx",
"datum_kwargs",
"data_keys",
"event_count",
"event_offset",
"stream_name",
"indices",
"seq_nums",
"stream_resource",
"uid"
],
Expand Down
Loading