-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Describe the bug
So I've used the same code for FeatureGroup ingestion for years, and I noticed the other day that ingest() now just hangs. This seems to be correlated with an upgrade on my Mac to Tahoe 26.0.1 but I can't 100% confirm.
To reproduce
Run the code below and it will hang on waiter().acquire, if you flip the USE_SPAWN_MODE flag to True, the code/ingest will run fine.
Code
import pandas as pd
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import time
import multiprocessing
import multiprocess
# Toggle this flag to test spawn mode fix
USE_SPAWN_MODE = False # Set to True to fix the Tahoe hang issue
if __name__ == '__main__':
if USE_SPAWN_MODE:
print("Using SPAWN mode (fix for Tahoe)")
multiprocessing.set_start_method('spawn', force=True)
multiprocess.set_start_method('spawn', force=True)
else:
print("Using default fork mode (will hang on Tahoe)")
# Create fake data
data = pd.DataFrame({
'record_id': [f'id_{i}' for i in range(10)],
'feature_1': [float(i) for i in range(10)],
'feature_2': [float(i * 2) for i in range(10)],
'event_time': [time.time()] * 10
})
# Setup SageMaker session
sagemaker_session = sagemaker.Session()
# Define feature group
feature_group_name = 'temp_delete_me'
feature_group = FeatureGroup(
name=feature_group_name,
sagemaker_session=sagemaker_session
)
# Create feature definitions
feature_group.load_feature_definitions(data_frame=data)
# Create feature group
print("Creating feature group...")
feature_group.create(
s3_uri=f's3://{sagemaker_session.default_bucket()}/featurestore',
record_identifier_name='record_id',
event_time_feature_name='event_time',
role_arn=sagemaker.get_execution_role(),
enable_online_store=True
)
# Wait for feature group to be created (can take 1-2 minutes)
print("Waiting for feature group to be ready...")
status = feature_group.describe().get("FeatureGroupStatus")
while status == "Creating":
print(f"Status: {status}... waiting 10 seconds")
time.sleep(10)
status = feature_group.describe().get("FeatureGroupStatus")
print(f"Feature group status: {status}")
# This will hang on macOS Tahoe with USE_SPAWN_MODE=False
print("Starting ingest...")
feature_group.ingest(
data_frame=data,
max_workers=2,
max_processes=2,
wait=True
)
print("Ingest completed!")
Full stack trace
^CProcess ForkPoolWorker-5:
Process ForkPoolWorker-6:
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/briford/work/workbench/scripts/sagemaker_feature_group_issue.py:61 in <module> │
│ │
│ 58 │ │
│ 59 │ # This will hang on macOS Tahoe with USE_SPAWN_MODE=False │
│ 60 │ print("Starting ingest...") │
│ ❱ 61 │ feature_group.ingest( │
│ 62 │ │ data_frame=data, │
│ 63 │ │ max_workers=2, │
│ 64 │ │ max_processes=2, │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/sagemaker/feature_store/featur │
│ e_group.py:1183 in ingest │
│ │
│ 1180 │ │ │ profile_name=profile_name, │
│ 1181 │ │ ) │
│ 1182 │ │ │
│ ❱ 1183 │ │ manager.run(data_frame=data_frame, target_stores=target_stores, wait=wait, timeo │
│ 1184 │ │ │
│ 1185 │ │ return manager │
│ 1186 │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/sagemaker/feature_store/featur │
│ e_group.py:607 in run │
│ │
│ 604 │ │ │ │ data_frame=data_frame, target_stores=target_stores │
│ 605 │ │ │ ) │
│ 606 │ │ else: │
│ ❱ 607 │ │ │ self._run_multi_process( │
│ 608 │ │ │ │ data_frame=data_frame, target_stores=target_stores, wait=wait, timeout=t │
│ 609 │ │ │ ) │
│ 610 │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/sagemaker/feature_store/featur │
│ e_group.py:519 in _run_multi_process │
│ │
│ 516 │ │ self._async_result = self._processing_pool.amap(f, args) │
│ 517 │ │ │
│ 518 │ │ if wait: │
│ ❱ 519 │ │ │ self.wait(timeout=timeout) │
│ 520 │ │
│ 521 │ @staticmethod │
│ 522 │ def _run_multi_threaded( │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/sagemaker/feature_store/featur │
│ e_group.py:288 in wait │
│ │
│ 285 │ │ │ self._processing_pool.terminate() │
│ 286 │ │ │ self._processing_pool.close() │
│ 287 │ │ │ self._processing_pool.clear() │
│ ❱ 288 │ │ │ raise i │
│ 289 │ │ else: │
│ 290 │ │ │ # terminate normally │
│ 291 │ │ │ self._processing_pool.close() │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/sagemaker/feature_store/featur │
│ e_group.py:282 in wait │
│ │
│ 279 │ │ │ │ if timeout is reached. │
│ 280 │ │ """ │
│ 281 │ │ try: │
│ ❱ 282 │ │ │ results = self._async_result.get(timeout=timeout) │
│ 283 │ │ except KeyboardInterrupt as i: │
│ 284 │ │ │ # terminate workers abruptly on keyboard interrupt. │
│ 285 │ │ │ self._processing_pool.terminate() │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/multiprocess/pool.py:768 in │
│ get │
│ │
│ 765 │ │ self._event.wait(timeout) │
│ 766 │ │
│ 767 │ def get(self, timeout=None): │
│ ❱ 768 │ │ self.wait(timeout) │
│ 769 │ │ if not self.ready(): │
│ 770 │ │ │ raise TimeoutError │
│ 771 │ │ if self._success: │
│ │
│ /Users/briford/.pyenv/versions/py312/lib/python3.12/site-packages/multiprocess/pool.py:765 in │
│ wait │
│ │
│ 762 │ │ return self._success │
│ 763 │ │
│ 764 │ def wait(self, timeout=None): │
│ ❱ 765 │ │ self._event.wait(timeout) │
│ 766 │ │
│ 767 │ def get(self, timeout=None): │
│ 768 │ │ self.wait(timeout) │
│ │
│ /Users/briford/.pyenv/versions/3.12.9/lib/python3.12/threading.py:655 in wait │
│ │
│ 652 │ │ with self._cond: │
│ 653 │ │ │ signaled = self._flag │
│ 654 │ │ │ if not signaled: │
│ ❱ 655 │ │ │ │ signaled = self._cond.wait(timeout) │
│ 656 │ │ │ return signaled │
│ 657 │
│ 658 │
│ │
│ /Users/briford/.pyenv/versions/3.12.9/lib/python3.12/threading.py:355 in wait │
│ │
│ 352 │ │ gotit = False │
│ 353 │ │ try: # restore state no matter what (e.g., KeyboardInterrupt) │
│ 354 │ │ │ if timeout is None: │
│ ❱ 355 │ │ │ │ waiter.acquire() │
│ 356 │ │ │ │ gotit = True │
│ 357 │ │ │ else: │
│ 358 │ │ │ │ if timeout > 0: │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
KeyboardInterrupt
Expected behavior
Ingestion should ingest the rows and not hang.
System information
A description of your system. Please provide:
- Hardware: Mac Laptop M1 Max Tahoe 26.0.1 (25A362)
- SageMaker Python SDK version: 2.253.1
- Framework name (eg. PyTorch) or algorithm (eg. KMeans): None
- Framework version: None
- Python version: 3:12
- CPU or GPU: CPU
- Custom Docker image (Y/N): N
Additional context
Again, I'm not 100% sure this is the Tahoe 26.0.1 upgrade but maybe they have some fork() security thing? Also this could potentially be related to #3332. Maybe the forks() try to make a new session and something goes awry?