Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 145 additions & 41 deletions netbox/dcim/forms/bulk_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _

from circuits.models import Circuit
from dcim.choices import *
from dcim.constants import *
from dcim.models import *
Expand Down Expand Up @@ -1398,19 +1399,52 @@ def save(self, *args, **kwargs):
#

class CableImportForm(NetBoxModelImportForm):
"""
CSV bulk import form for cables.

Supports dynamic parent model resolution - terminations are identified by their parent
object (device, circuit, or power panel) and termination name.

The parent field resolves to different models based on the termination type
See CABLE_PARENT_MAPPING for supported termination types.
"""

# Map cable termination content types to their parent model and lookup field.
#
# This mapping enables dynamic parent model resolution during cable CSV imports.
# Each entry maps a termination type to a tuple of (parent_content_type, accessor):
#
# Format: 'app.model': ('parent_app.ParentModel', 'accessor')
#
CABLE_PARENT_MAPPING = {
'dcim.interface': ('dcim.Device', 'name'),
'dcim.consoleport': ('dcim.Device', 'name'),
'dcim.consoleserverport': ('dcim.Device', 'name'),
'dcim.powerport': ('dcim.Device', 'name'),
'dcim.poweroutlet': ('dcim.Device', 'name'),
'dcim.frontport': ('dcim.Device', 'name'),
'dcim.rearport': ('dcim.Device', 'name'),
'circuits.circuittermination': ('circuits.Circuit', 'cid'),
'dcim.powerfeed': ('dcim.PowerPanel', 'name'),
}
# Map parent model name to (parent_field_name, termination_name_field, value_transform)
TERMINATION_FIELDS = {
'Circuit': ('circuit', 'term_side', str.upper),
'Device': ('device', 'name', None),
'PowerPanel': ('power_panel', 'name', None),
}

# Termination A
side_a_site = CSVModelChoiceField(
label=_('Side A site'),
queryset=Site.objects.all(),
required=False,
to_field_name='name',
help_text=_('Site of parent device A (if any)'),
help_text=_('Site of parent A (if any)')
)
side_a_device = CSVModelChoiceField(
label=_('Side A device'),
queryset=Device.objects.all(),
to_field_name='name',
help_text=_('Device name')
side_a_parent = forms.CharField(
label=_('Side A parent'),
help_text=_('Device name, Circuit CID, or Power Panel name')
)
side_a_type = CSVContentTypeField(
label=_('Side A type'),
Expand All @@ -1429,13 +1463,11 @@ class CableImportForm(NetBoxModelImportForm):
queryset=Site.objects.all(),
required=False,
to_field_name='name',
help_text=_('Site of parent device B (if any)'),
help_text=_('Site of parent B (if any)')
)
side_b_device = CSVModelChoiceField(
label=_('Side B device'),
queryset=Device.objects.all(),
to_field_name='name',
help_text=_('Device name')
side_b_parent = forms.CharField(
label=_('Side B parent'),
help_text=_('Device name, Circuit CID, or Power Panel name')
)
side_b_type = CSVContentTypeField(
label=_('Side B type'),
Expand Down Expand Up @@ -1484,29 +1516,14 @@ class CableImportForm(NetBoxModelImportForm):
class Meta:
model = Cable
fields = [
'side_a_site', 'side_a_device', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_device', 'side_b_type',
'side_a_site', 'side_a_parent', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_parent', 'side_b_type',
'side_b_name', 'type', 'status', 'tenant', 'label', 'color', 'length', 'length_unit', 'description',
'comments', 'tags',
]

def __init__(self, data=None, *args, **kwargs):
super().__init__(data, *args, **kwargs)

if data:
# Limit choices for side_a_device to the assigned side_a_site
if side_a_site := data.get('side_a_site'):
side_a_device_params = {f'site__{self.fields["side_a_site"].to_field_name}': side_a_site}
self.fields['side_a_device'].queryset = self.fields['side_a_device'].queryset.filter(
**side_a_device_params
)

# Limit choices for side_b_device to the assigned side_b_site
if side_b_site := data.get('side_b_site'):
side_b_device_params = {f'site__{self.fields["side_b_site"].to_field_name}': side_b_site}
self.fields['side_b_device'].queryset = self.fields['side_b_device'].queryset.filter(
**side_b_device_params
)

def _clean_side(self, side):
"""
Derive a Cable's A/B termination objects.
Expand All @@ -1515,31 +1532,118 @@ def _clean_side(self, side):
"""
assert side in 'ab', f"Invalid side designation: {side}"

device = self.cleaned_data.get(f'side_{side}_device')
content_type = self.cleaned_data.get(f'side_{side}_type')
site = self.cleaned_data.get(f'side_{side}_site')
parent_value = self.cleaned_data.get(f'side_{side}_parent')
name = self.cleaned_data.get(f'side_{side}_name')
if not device or not content_type or not name:

if not parent_value or not content_type or not name: # pragma: no cover
return None

model = content_type.model_class()
# Get the parent model mapping from the submitted content_type
parent_map = self.CABLE_PARENT_MAPPING.get(f'{content_type.app_label}.{content_type.model}')
# This should never happen
assert parent_map, (
'Unknown cable termination content type parent mapping: '
f'{content_type.app_label}.{content_type.model}'
)

parent_content_type, parent_accessor = parent_map
parent_app_label, parent_model_name = parent_content_type.split('.')

# Get the parent model class
try:
parent_ct = ContentType.objects.get(app_label=parent_app_label.lower(), model=parent_model_name.lower())
parent_model: Device | PowerPanel | Circuit = parent_ct.model_class()
except ContentType.DoesNotExist: # pragma: no cover
# This should never happen
raise AssertionError(f'Unknown cable termination parent content type: {parent_content_type}')

# Build query for parent lookup
parent_query = {parent_accessor: parent_value}
# Add site to query if provided
if site:
parent_query['site'] = site

# Look up the parent object
try:
if device.virtual_chassis and device.virtual_chassis.master == device and \
model.objects.filter(device=device, name=name).count() == 0:
termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
parent_object = parent_model.objects.get(**parent_query)
except parent_model.DoesNotExist:
raise forms.ValidationError(
_('Side {side_upper}: {model_name} not found: {value}').format(
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
)
)
except parent_model.MultipleObjectsReturned:
raise forms.ValidationError(
_('Side {side_upper}: Multiple {model_name} objects found: {value}').format(
side_upper=side.upper(), model_name=parent_model.__name__, value=parent_value
)
)

# Get the termination model class
termination_model = content_type.model_class()

# Build the query to find the termination object
field_mapping = self.TERMINATION_FIELDS.get(parent_model.__name__)
if not field_mapping: # pragma: no cover
return None

parent_field, name_field, value_transform = field_mapping
query = {parent_field: parent_object}

if value_transform:
name = value_transform(name)

if name:
query[name_field] = name

# Add site to query if provided (for site-scoped parents)
if site and parent_field in ('device', 'power_panel'):
query[f'{parent_field}__site'] = site

# Look up the termination object
try:
# Handle virtual chassis for device-based terminations
if (parent_field == 'device' and
parent_object.virtual_chassis and
parent_object.virtual_chassis.master == parent_object and
termination_model.objects.filter(**query).count() == 0):
query[f'{parent_field}__in'] = parent_object.virtual_chassis.members.all()
query.pop(parent_field, None)
termination_object = termination_model.objects.get(**query)
else:
termination_object = model.objects.get(device=device, name=name)
termination_object = termination_model.objects.get(**query)

# Check if already connected to a cable
if termination_object.cable is not None and termination_object.cable != self.instance:
raise forms.ValidationError(
_("Side {side_upper}: {device} {termination_object} is already connected").format(
side_upper=side.upper(), device=device, termination_object=termination_object
_('Side {side_upper}: {parent} {termination} is already connected').format(
side_upper=side.upper(), parent=parent_object, termination=termination_object
)
)
except ObjectDoesNotExist:
raise forms.ValidationError(
_("{side_upper} side termination not found: {device} {name}").format(
side_upper=side.upper(), device=device, name=name

# Circuit terminations can also be connected to provider networks
if (name_field == 'term_side' and
hasattr(termination_object, '_provider_network') and
termination_object._provider_network is not None):
raise forms.ValidationError(
_('Side {side_upper}: {parent} {termination} is already connected to a provider network').format(
side_upper=side.upper(), parent=parent_object, termination=termination_object
)
)
except termination_model.DoesNotExist:
raise forms.ValidationError(
_('Side {side_upper}: {model_name} not found: {parent} {name}').format(
side_upper=side.upper(),
model_name=termination_model.__name__,
parent=parent_object, name=name or '',
),
)
except termination_model.MultipleObjectsReturned: # pragma: no cover
# This should never happen
raise AssertionError('Multiple termination objects returned for query: {query}'.format(query=query))

setattr(self.instance, f'{side}_terminations', [termination_object])
return termination_object

Expand Down
Loading
Loading