Skip to content

Commit

Permalink
add psycode support (#8)
Browse files Browse the repository at this point in the history
adding psycode versions of workflow widgets and experiment objects
  • Loading branch information
ryan-gillis authored Oct 15, 2024
1 parent 90a0b45 commit 013536b
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/np_workflows/experiments/openscope_psycode/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .main_psycode_pilot import new_experiment, Hab, Ephys, validate_selected_workflow
from .psycode_workflow_widget import PsyCode_workflow_widget
217 changes: 217 additions & 0 deletions src/np_workflows/experiments/openscope_psycode/main_psycode_pilot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import configparser
import contextlib
import copy
import dataclasses
import datetime
import enum
import functools
import pathlib
import platform
import shutil
import threading
import time
import zlib
from typing import ClassVar, Literal, NamedTuple, NoReturn, Optional, TypedDict

import IPython
import IPython.display
import ipywidgets as ipw
import np_config
import np_logging
import np_services
import np_session
import np_workflows
import PIL.Image
import pydantic
from pyparsing import Any
from np_services import (
Service,
Finalizable,
ScriptCamstim, SessionCamstim,
SessionCamstim,
OpenEphys,
Sync,
VideoMVR,
NewScaleCoordinateRecorder,
MouseDirector,
)

logger = np_logging.getLogger(__name__)


class PsyCodeSession(enum.Enum):
"""Enum for the different sessions available, each with different param sets."""

PRETEST = "pretest"
HAB = "hab"
EPHYS = "ephys"


class PsyCodeMixin:
"""Provides project-specific methods and attributes, mainly related to camstim scripts."""

workflow: PsyCodeSession
"""Enum for particular workflow/session, e.g. PRETEST, HAB_60, HAB_90,
EPHYS."""

session: np_session.PipelineSession
mouse: np_session.Mouse
user: np_session.User
platform_json: np_session.PlatformJson

@property
def recorders(self) -> tuple[Service, ...]:
"""Services to be started before stimuli run, and stopped after. Session-dependent."""
match self.workflow:
case PsyCodeSession.PRETEST | PsyCodeSession.EPHYS:
return (Sync, VideoMVR, OpenEphys)
case PsyCodeSession.HAB:
return (Sync, VideoMVR)

@property
def stims(self) -> tuple[Service, ...]:
return (SessionCamstim, )

def initialize_and_test_services(self) -> None:
"""Configure, initialize (ie. reset), then test all services."""

MouseDirector.user = self.user.id
MouseDirector.mouse = self.mouse.id

OpenEphys.folder = self.session.folder

NewScaleCoordinateRecorder.log_root = self.session.npexp_path
NewScaleCoordinateRecorder.log_name = self.platform_json.path.name

SessionCamstim.labtracks_mouse_id = self.mouse.id
SessionCamstim.lims_user_id = self.user.id

self.configure_services()

super().initialize_and_test_services()

def update_state(self) -> None:
"Store useful but non-essential info."
self.mouse.state['last_session'] = self.session.id
self.mouse.state['last_PsyCode_session'] = str(self.workflow)
if self.mouse == 366122:
return
match self.workflow:
case PsyCodeSession.PRETEST:
return
case PsyCodeSession.HAB:
self.session.project.state['latest_hab'] = self.session.id
case PsyCodeSession.EPHYS:
self.session.project.state['latest_ephys'] = self.session.id
self.session.project.state['sessions'] = self.session.project.state.get('sessions', []) + [self.session.id]

def run_stim(self) -> None:

self.update_state()

if not SessionCamstim.is_ready_to_start():
raise RuntimeError("SessionCamstim is not ready to start.")

np_logging.web(f'PsyCode_{self.workflow.name.lower()}').info(f"Started session {self.mouse.mtrain.stage['name']}")
SessionCamstim.start()

with contextlib.suppress(Exception):
while not SessionCamstim.is_ready_to_start():
time.sleep(2.5)

if isinstance(SessionCamstim, Finalizable):
SessionCamstim.finalize()

with contextlib.suppress(Exception):
np_logging.web(f'PsyCode_{self.workflow.name.lower()}').info(f"Finished session {self.mouse.mtrain.stage['name']}")


def copy_data_files(self) -> None:
super().copy_data_files()

# When all processing completes, camstim Agent class passes data and uuid to
# /camstim/lims BehaviorSession class, and write_behavior_data() writes a
# final .pkl with default name YYYYMMDDSSSS_mouseID_foragingID.pkl
# - if we have a foraging ID, we can search for that
if None == (stim_pkl := next(self.session.npexp_path.glob(f'{self.session.date:%y%m%d}*_{self.session.mouse}_*.pkl'), None)):
logger.warning('Did not find stim file on npexp matching the format `YYYYMMDDSSSS_mouseID_foragingID.pkl`')
return
assert stim_pkl
if not self.session.platform_json.foraging_id:
self.session.platform_json.foraging_id = stim_pkl.stem.split('_')[-1]
new_stem = f'{self.session.folder}.stim'
logger.debug(f'Renaming stim file copied to npexp: {stim_pkl} -> {new_stem}')
stim_pkl = stim_pkl.rename(stim_pkl.with_stem(new_stem))

# remove other stim pkl, which is nearly identical, if it was also copied
for pkl in self.session.npexp_path.glob('*.pkl'):
if (
self.session.folder not in pkl.stem
and
abs(pkl.stat().st_size - stim_pkl.stat().st_size) < 1e6
):
logger.debug(f'Deleting extra stim pkl copied to npexp: {pkl.stem}')
pkl.unlink()


def validate_selected_workflow(session: PsyCodeSession, mouse: np_session.Mouse) -> None:
for workflow in ('hab', 'ephys'):
if (
workflow in session.value.lower()
and workflow not in mouse.mtrain.stage['name'].lower()
) or (
session.value.lower() == 'ephys' and 'hab' in mouse.mtrain.stage['name'].lower()
):
raise ValueError(f"Workflow selected ({session.value}) does not match MTrain stage ({mouse.mtrain.stage['name']}): please check cells above.")


class Hab(PsyCodeMixin, np_workflows.PipelineHab):
def __init__(self, *args, **kwargs):
self.services = (
MouseDirector,
Sync,
VideoMVR,
self.imager,
NewScaleCoordinateRecorder,
SessionCamstim,
)
super().__init__(*args, **kwargs)


class Ephys(PsyCodeMixin, np_workflows.PipelineEphys):
def __init__(self, *args, **kwargs):
self.services = (
MouseDirector,
Sync,
VideoMVR,
self.imager,
NewScaleCoordinateRecorder,
SessionCamstim,
OpenEphys,
)
super().__init__(*args, **kwargs)


# --------------------------------------------------------------------------------------


def new_experiment(
mouse: int | str | np_session.Mouse,
user: str | np_session.User,
workflow: PsyCodeSession,
) -> Ephys | Hab:
"""Create a new experiment for the given mouse and user."""
match workflow:
case PsyCodeSession.PRETEST | PsyCodeSession.EPHYS:
experiment = Ephys(mouse, user)
case PsyCodeSession.HAB:
experiment = Hab(mouse, user)
case _:
raise ValueError(f"Invalid workflow type: {workflow}")
experiment.workflow = workflow

with contextlib.suppress(Exception):
np_logging.web(f'PsyCode_{experiment.workflow.name.lower()}').info(f"{experiment} created")

return experiment

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import configparser
import contextlib
import copy
import enum
import functools
from typing import ClassVar, Literal, NamedTuple, NoReturn, Optional, TypedDict

import IPython.display
import ipywidgets as ipw
import np_config
import np_logging
import np_session
import np_workflows
from pyparsing import Any

from np_workflows.experiments.openscope_psycode.main_psycode_pilot import PsyCodeSession

global_state = {}
"""Global variable for persisting widget states."""

# for widget, before creating a experiment --------------------------------------------- #

class SelectedSession:
def __init__(self, session: str | PsyCodeSession, mouse: str | int | np_session.Mouse):
if isinstance(session, str):
session = PsyCodeSession(session)
self.session = session
self.mouse = str(mouse)

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.session}, {self.mouse})"


def PsyCode_workflow_widget(
mouse: str | int | np_session.Mouse,
) -> SelectedSession:
"""Select a stimulus session (hab, pretest, ephys) to run.
An object with mutable attributes is returned, so the selected session can be
updated along with the GUI selection. (Preference would be to return an enum
directly, and change it's value, but that doesn't seem possible.)
"""

selection = SelectedSession(PsyCodeSession.PRETEST, mouse)

session_dropdown = ipw.Select(
options=tuple(_.value for _ in PsyCodeSession),
description="Session",
)

def update_selection():
selection.__init__(str(session_dropdown.value), str(mouse))

if (previously_selected_value := global_state.get('selected_session')):
session_dropdown.value = previously_selected_value
update_selection()

console = ipw.Output()
with console:
if last_session := np_session.Mouse(selection.mouse).state.get('last_PsyCode_session'):
print(f"{mouse} last session: {last_session}")
print(f"Selected: {selection.session}")

def update(change):
if change["name"] != "value":
return
if (options := getattr(change["owner"], "options", None)) and change[
"new"
] not in options:
return
if change["new"] == change["old"]:
return
update_selection()
with console:
print(f"Selected: {selection.session}")
global_state['selected_session'] = selection.session.value

session_dropdown.observe(update, names='value')

IPython.display.display(ipw.VBox([session_dropdown, console]))

return selection

0 comments on commit 013536b

Please sign in to comment.