Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions contact_map/contact_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ def __init__(self, topology, query, haystack, cutoff, n_neighbors_ignored):
self._atom_idx_to_residue_idx = {atom.index: atom.residue.index
for atom in self.topology.atoms}

def _check_compatibility(self, other):
assert self.cutoff == other.cutoff
assert self.topology == other.topology
assert self.query == other.query
assert self.haystack == other.haystack
assert self.n_neighbors_ignored == other.n_neighbors_ignored

def save_to_file(self, filename, mode="w"):
"""Save this object to the given file.

Expand Down Expand Up @@ -519,16 +526,6 @@ def __init__(self, frame, query=None, haystack=None, cutoff=0.45,
(self._atom_contacts, self._residue_contacts) = contact_maps


class ContactTrajectory(ContactObject):
"""
Contact map (atomic and residue) for each individual trajectory frame.

NOT YET IMPLEMENTED. I'm not sure whether this gives appreciable speed
improvements over running contact map over and over.
"""
pass


class ContactFrequency(ContactObject):
"""
Contact frequency (atomic and residue) for a trajectory.
Expand Down Expand Up @@ -556,24 +553,22 @@ class ContactFrequency(ContactObject):
"""
def __init__(self, trajectory, query=None, haystack=None, cutoff=0.45,
n_neighbors_ignored=2, frames=None):
self._trajectory = trajectory
if frames is None:
frames = range(len(trajectory))
self.frames = frames
self._n_frames = len(frames)
super(ContactFrequency, self).__init__(trajectory.topology,
query, haystack, cutoff,
n_neighbors_ignored)
self._build_contact_map()
self._build_contact_map(trajectory)

def _build_contact_map(self):
def _build_contact_map(self, trajectory):
# We actually build the contact map on a per-residue basis, although
# we save it on a per-atom basis. This allows us ignore
# n_nearest_neighbor residues.
# TODO: this whole thing should be cleaned up and should replace
# MDTraj's really slow old computer_contacts by using MDTraj's new
# MDTraj's really slow old compute_contacts by using MDTraj's new
# neighborlists (unless the MDTraj people do that first).
trajectory = self.trajectory
self._atom_contacts_count = collections.Counter([])
self._residue_contacts_count = collections.Counter([])

Expand All @@ -591,15 +586,45 @@ def _build_contact_map(self):
self._atom_contacts_count.update(frame_atom_contacts)
self._residue_contacts_count += frame_residue_contacts

@property
def trajectory(self):
return self._trajectory

@property
def n_frames(self):
"""Number of frames in the mapped trajectory"""
return self._n_frames

def add_contact_frequency(self, other):
"""Add results from `other` to the internal counter.

Parameters
----------
other : :class:`.ContactFrequency`
contact frequency made from the frames to remove from this
contact frequency
"""
self._check_compatibility(other)
self._atom_contacts_count += other._atom_contacts_count
self._residue_contacts_count += other._residue_contacts_count
self._n_frames += other._n_frames


def subtract_contact_frequency(self, other):
"""Subtracts results from `other` from internal counter.

Note that this is intended for the case that you're removing a
subtrajectory of the already-calculated trajectory. If you want to
compare two different contact frequency maps, use
:class:`.ContactDifference`.

Parameters
----------
other : :class:`.ContactFrequency`
contact frequency made from the frames to remove from this
contact frequency
"""
self._check_compatibility(other)
self._atom_contacts_count -= other._atom_contacts_count
self._residue_contacts_count -= other._residue_contacts_count
self._n_frames -= other._n_frames

@property
def atom_contacts(self):
"""Atoms pairs mapped to fraction of trajectory with that contact"""
Expand Down Expand Up @@ -633,7 +658,7 @@ class ContactDifference(ContactObject):
def __init__(self, positive, negative):
self.positive = positive
self.negative = negative
# TODO: verify that the combination is compatible: same topol, etc
positive._check_compatibility(negative)
super(ContactDifference, self).__init__(positive.topology,
positive.query,
positive.haystack,
Expand Down
4 changes: 2 additions & 2 deletions contact_map/min_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def __init__(self, trajectory, cutoff, frame_number=0, excluded=None):
self._calculate_nearest(trajectory, self.cutoff,
self.frame_number, self.excluded)

def _calculate_nearest(self, trajectory, cutoff, frame_number,
excluded):
@staticmethod
def _calculate_nearest(trajectory, cutoff, frame_number, excluded):
"""
Calculate the nearest atoms from the input data.

Expand Down
39 changes: 39 additions & 0 deletions contact_map/tests/test_contact_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ def test_saving(self, idx):
assert m.atom_contacts.counter == m2.atom_contacts.counter
os.remove(test_file)

# TODO: add tests for ContactObject._check_consistency


class TestContactFrequency(object):
def setup(self):
Expand Down Expand Up @@ -296,6 +298,43 @@ def test_most_common_atoms_for_contact(self, select_by):
]
assert set(most_common_2_3_frozenset) == set(expected_2_3)

def test_add_contact_frequency(self):
# self.map has all 5 frames
# we can figure out what the [0:4] would look like
start = ContactFrequency(trajectory=traj[:4],
cutoff=0.075,
n_neighbors_ignored=0)
add_in = ContactFrequency(trajectory=traj[4:],
cutoff=0.075,
n_neighbors_ignored=0)

start.add_contact_frequency(add_in)

assert start.atom_contacts.counter == \
self.map.atom_contacts.counter

assert start.residue_contacts.counter == \
self.map.residue_contacts.counter

def test_subtract_contact_frequency(self):
first_four = ContactFrequency(trajectory=traj[:4],
cutoff=0.075,
n_neighbors_ignored=0)
last_frame = ContactFrequency(trajectory=traj[4:],
cutoff=0.075,
n_neighbors_ignored=0)
test_subject = ContactFrequency(trajectory=traj,
cutoff=0.075,
n_neighbors_ignored=0)

test_subject.subtract_contact_frequency(first_four)

assert test_subject.atom_contacts.counter == \
last_frame.atom_contacts.counter

assert test_subject.residue_contacts.counter == \
last_frame.residue_contacts.counter


class TestContactCount(object):
def setup(self):
Expand Down