|
4 | 4 | import math |
5 | 5 | import time |
6 | 6 | from collections.abc import AsyncIterable |
| 7 | +from decimal import Decimal |
7 | 8 | from typing import TypeVar |
8 | 9 |
|
9 | 10 | import aiohttp |
|
19 | 20 | from yarl import URL |
20 | 21 |
|
21 | 22 | from aleph.vm.conf import settings |
| 23 | +from aleph.vm.orchestrator.utils import ( |
| 24 | + get_community_wallet_address, |
| 25 | + is_after_community_wallet_start, |
| 26 | +) |
22 | 27 | from aleph.vm.pool import VmPool |
23 | 28 | from aleph.vm.utils import create_task_log_exceptions |
24 | 29 |
|
|
35 | 40 | logger = logging.getLogger(__name__) |
36 | 41 |
|
37 | 42 | Value = TypeVar("Value") |
| 43 | +COMMUNITY_STREAM_RATIO = Decimal(0.2) |
38 | 44 |
|
39 | 45 |
|
40 | 46 | async def retry_generator(generator: AsyncIterable[Value], max_seconds: int = 8) -> AsyncIterable[Value]: |
@@ -154,6 +160,7 @@ async def monitor_payments(app: web.Application): |
154 | 160 | try: |
155 | 161 | logger.debug("Monitoring balances task running") |
156 | 162 | await check_payment(pool) |
| 163 | + logger.debug("Monitoring balances task ended") |
157 | 164 | except Exception as e: |
158 | 165 | # Catch all exceptions as to never stop the task. |
159 | 166 | logger.warning(f"check_payment failed {e}", exc_info=True) |
@@ -191,31 +198,62 @@ async def check_payment(pool: VmPool): |
191 | 198 | logger.debug(f"Stopping {last_execution} due to insufficient balance") |
192 | 199 | await pool.stop_vm(last_execution.vm_hash) |
193 | 200 | required_balance = await compute_required_balance(executions) |
| 201 | + community_wallet = await get_community_wallet_address() |
| 202 | + if not community_wallet: |
| 203 | + logger.error("Monitor payment ERROR: No community wallet set. Cannot check community payment") |
194 | 204 |
|
195 | 205 | # Check if the balance held in the wallet is sufficient stream tier resources |
196 | 206 | for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): |
197 | 207 | for chain, executions in chains.items(): |
198 | 208 | try: |
199 | 209 | stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) |
| 210 | + |
200 | 211 | logger.debug( |
201 | | - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" |
| 212 | + f"Stream flow from {sender} to {settings.PAYMENT_RECEIVER_ADDRESS} = {stream} {chain.value}" |
202 | 213 | ) |
| 214 | + except ValueError as error: |
| 215 | + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") |
| 216 | + continue |
| 217 | + try: |
| 218 | + community_stream = await get_stream(sender=sender, receiver=community_wallet, chain=chain) |
| 219 | + logger.debug(f"Stream flow from {sender} to {community_wallet} (community) : {stream} {chain}") |
| 220 | + |
203 | 221 | except ValueError as error: |
204 | 222 | logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") |
205 | 223 | continue |
206 | 224 |
|
207 | | - required_stream = await compute_required_flow(executions) |
208 | | - logger.debug(f"Required stream for Sender {sender} executions: {required_stream}") |
209 | | - # Stop executions until the required stream is reached |
210 | | - while (stream + settings.PAYMENT_BUFFER) < required_stream: |
211 | | - try: |
212 | | - last_execution = executions.pop(-1) |
213 | | - except IndexError: # Empty list |
214 | | - logger.debug("No execution can be maintained due to insufficient stream") |
| 225 | + while executions: |
| 226 | + executions_with_community = [ |
| 227 | + execution |
| 228 | + for execution in executions |
| 229 | + if await is_after_community_wallet_start(execution.times.started_at) |
| 230 | + ] |
| 231 | + |
| 232 | + required_stream = await compute_required_flow(executions_with_community) |
| 233 | + executions_without_community = [ |
| 234 | + execution |
| 235 | + for execution in executions |
| 236 | + if not await is_after_community_wallet_start(execution.times.started_at) |
| 237 | + ] |
| 238 | + logger.info("flow community %s", executions_with_community) |
| 239 | + logger.info("flow without community %s", executions_without_community) |
| 240 | + required_stream_without_community = await compute_required_flow(executions_without_community) |
| 241 | + |
| 242 | + required_crn_stream = required_stream * (1 - COMMUNITY_STREAM_RATIO) + required_stream_without_community |
| 243 | + required_community_stream = required_stream * COMMUNITY_STREAM_RATIO |
| 244 | + logger.debug( |
| 245 | + f"Stream for senders {sender} {len(executions)} executions. CRN : {stream} / {required_crn_stream}." |
| 246 | + f"Community: {community_stream} / {required_community_stream}" |
| 247 | + ) |
| 248 | + # Can pay all executions |
| 249 | + if (stream + settings.PAYMENT_BUFFER) > required_crn_stream and ( |
| 250 | + community_stream + settings.PAYMENT_BUFFER |
| 251 | + ) > required_community_stream: |
215 | 252 | break |
216 | | - logger.debug(f"Stopping {last_execution} due to insufficient stream") |
| 253 | + # Stop executions until the required stream is reached |
| 254 | + last_execution = executions.pop(-1) |
| 255 | + logger.info(f"Stopping {last_execution} of {sender} due to insufficient stream") |
217 | 256 | await pool.stop_vm(last_execution.vm_hash) |
218 | | - required_stream = await compute_required_flow(executions) |
219 | 257 |
|
220 | 258 |
|
221 | 259 | async def start_payment_monitoring_task(app: web.Application): |
|
0 commit comments