Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue984 refactor to implement scheduled polls (second pull replaces first.) #985

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/Explanation/SarraPluginDev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,17 @@ for detailed information about call signatures and return values, etc...
| | permanent name. |
| | |
| | return the new name for the downloaded/sent file. |
| | |
+---------------------+----------------------------------------------------+
| download(self,msg) | replace built-in downloader return true on success |
| | takes message as argument. |
+---------------------+----------------------------------------------------+
| gather(self) | gather messages from a source, returns a list of |
| | messages. |
| | can also return a tuple where the first element |
| | is a boolean flag keep_going indicating whether |
| | to stop gather processing. |
| | |
+---------------------+----------------------------------------------------+
| | Called every housekeeping interval (minutes) |
| | used to clean cache, check for occasional issues. |
Expand Down
6 changes: 5 additions & 1 deletion docs/source/How2Guides/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ Other entry_points, extracted from sarracenia/flowcb/__init__.py ::


def gather(self):
Task: gather notification messages from a source... return a list of notification messages.
Task: gather notification messages from a source... return either:
* a list of notification messages, or
* a tuple, (bool:keep_going, list of messages)
* to curtail further gathers in this cycle.

return []

def metrics_report(self) -> dict:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/fr/CommentFaire/FlowCallbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Autres entry_points, extraits de sarracenia/flowcb/__init__.py ::

def gather(self):
Task: gather notification messages from a source... return a list of notification messages.
can also return tuple (keep_going, new_messages) where keep_going is a flag
that when False stops processing of further gather routines.
return []

"""
Expand Down
4 changes: 4 additions & 0 deletions docs/source/fr/Explication/SarraPluginDev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ pour des informations détaillées sur les signatures d’appel et les valeurs d
+---------------------+----------------------------------------------------+
| gather(self) | Rassembler les messages a la source, retourne une |
| | une liste de messages. |
| | on peut également retourner un tuple dont le |
| | première élément est une valeur booléen keep_going |
| | qui peut arreter l´execution des gather. |
| | |
+---------------------+----------------------------------------------------+
| | Appelé à chaque intervalle housekeeping (minutes). |
| | utilisé pour nettoyer le cache, vérifier les |
Expand Down
5 changes: 5 additions & 0 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,11 @@ def finalize(self, component=None, config=None):

if (component not in ['poll' ]):
self.path = list(map( os.path.expanduser, self.path ))
else:
if self.sleep > 1:
self.scheduled_interval = self.sleep
self.sleep=1


if self.vip and not features['vip']['present']:
logger.critical( f"vip feature requested, but missing library: {' '.join(features['vip']['modules_needed'])} " )
Expand Down
39 changes: 36 additions & 3 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ def run(self):
current_rate = 0
total_messages = 1
start_time = nowflt()
now=start_time
had_vip = False
current_sleep = self.o.sleep
last_time = start_time
Expand Down Expand Up @@ -460,6 +461,9 @@ def run(self):
)
stopping = True

if now > next_housekeeping or stopping:
next_housekeeping = self._runHousekeeping(now)

self.have_vip = self.has_vip()
if (self.o.component == 'poll') or self.have_vip:

Expand All @@ -481,7 +485,6 @@ def run(self):
spamming = False

self.filter()

self._runCallbacksWorklist('after_accept')

logger.debug(
Expand Down Expand Up @@ -1093,9 +1096,18 @@ def filter(self) -> None:

def gather(self) -> None:
so_far=0
keep_going=True
for p in self.plugins["gather"]:
try:
new_incoming = p(self.o.batch-so_far)
retval = p(self.o.batch-so_far)

# To avoid having to modify all existing gathers, support old API.
if type(retval) == tuple:
keep_going, new_incoming = retval
elif type(retval) == list:
new_incoming = retval
else:
logger.error( f"flowCallback plugin gather routine {p} returned unexpected type: {type(retval)}. Expected tuple of boolean and list of new messages" )
except Exception as ex:
logger.error( f'flowCallback plugin {p} crashed: {ex}' )
logger.debug( "details:", exc_info=True )
Expand All @@ -1106,9 +1118,30 @@ def gather(self) -> None:
so_far += len(new_incoming)

# if we gathered enough with a subset of plugins then return.
if so_far >= self.o.batch:
if not keep_going or (so_far >= self.o.batch):
if (self.o.component == 'poll' ):
self.worklist.poll_catching_up=True

return

# gather is an extended version of poll.
if self.o.component != 'poll':
return

if len(self.worklist.incoming) > 0:
logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) )
self.worklist.poll_catching_up = True
return
else:
self.worklist.poll_catching_up = False

if self.have_vip:
for plugin in self.plugins['poll']:
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)



def do(self) -> None:

Expand Down
30 changes: 6 additions & 24 deletions sarracenia/flow/poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ def __init__(self, options):
else:
logger.info( f"Good! post_exchange: {px} and exchange: {self.o.exchange} match so multiple instances to share a poll." )

if not 'poll' in ','.join(self.plugins['load']):
if not 'scheduled' in ','.join(self.plugins['load']):
self.plugins['load'].append('sarracenia.flowcb.scheduled.poll.Poll')

if not 'flowcb.poll.Poll' in ','.join(self.plugins['load']):
logger.info( f"adding poll plugin, because missing from: {self.plugins['load']}" )
self.plugins['load'].append('sarracenia.flowcb.poll.Poll')

if options.vip:
self.plugins['load'].insert(
0, 'sarracenia.flowcb.gather.message.Message')
self.plugins['load'].insert( 0, 'sarracenia.flowcb.gather.message.Message')

self.plugins['load'].insert(0,
'sarracenia.flowcb.post.message.Message')
self.plugins['load'].insert( 0, 'sarracenia.flowcb.post.message.Message')

if self.o.nodupe_ttl < self.o.fileAgeMax:
logger.warning( f"nodupe_ttl < fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904")
Expand All @@ -97,22 +98,3 @@ def do(self):
logger.debug('processing %d messages worked! (stop requested: %s)' %
(len(self.worklist.incoming), self._stop_requested))
self.worklist.incoming = []


def gather(self):

super().gather()

if len(self.worklist.incoming) > 0:
logger.info('ingesting %d postings into duplicate suppression cache' % len(self.worklist.incoming) )
self.worklist.poll_catching_up = True
return
else:
self.worklist.poll_catching_up = False

if self.have_vip:
for plugin in self.plugins['poll']:
new_incoming = plugin()
if len(new_incoming) > 0:
self.worklist.incoming.extend(new_incoming)

19 changes: 14 additions & 5 deletions sarracenia/flowcb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,23 @@ def ack(self,messagelist) -> None::

Task: acknowledge messages from a gather source.

def gather(self, messageCountMax) -> list::
def gather(self, messageCountMax) -> (gather_more, messages) ::

Task: gather messages from a source... return a list of messages
Task: gather messages from a source... return a tuple:

in a poll, gather is always called, regardless of vip posession.
in all other components, gather is only called when in posession
* gather_more ... bool whether to continue gathering
* messages ... list of messages

or just return a list of messages.

In a poll, gather is always called, regardless of vip posession.

In all other components, gather is only called when in posession
of the vip.
return []

return (True, list)
OR
return list

def after_accept(self,worklist) -> None::

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/gather/am.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,4 @@ def gather(self, messageCountMax):
except Exception as e:
logger.error(f"Unable to generate bulletin file. Error message: {e}")

return newmsg
return (True, newmsg)
8 changes: 4 additions & 4 deletions sarracenia/flowcb/gather/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,19 +692,19 @@ def gather(self, messageCountMax):
if len(self.queued_messages) > self.o.batch:
messages = self.queued_messages[0:self.o.batch]
self.queued_messages = self.queued_messages[self.o.batch:]
return messages
return (True, messages)

elif len(self.queued_messages) > 0:
messages = self.queued_messages
self.queued_messages = []

if self.o.sleep < 0:
return messages
return (True, messages)
else:
messages = []

if self.primed:
return self.wakeup()
return (True, self.wakeup())

cwd = os.getcwd()

Expand Down Expand Up @@ -740,4 +740,4 @@ def gather(self, messageCountMax):
messages = messages[0:self.o.batch]

self.primed = True
return messages
return (True, messages)
8 changes: 5 additions & 3 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ def __init__(self, options) -> None:

def gather(self, messageCountMax) -> list:
"""
return a current list of messages.
return:
True ... you can gather from other sources. and:
a list of messages obtained from this source.
"""
if hasattr(self,'consumer') and hasattr(self.consumer,'newMessages'):
return self.consumer.newMessages()
return (True, self.consumer.newMessages())
else:
logger.warning( f'not connected. Trying to connect to {self.o.broker}')
self.consumer = sarracenia.moth.Moth.subFactory(self.od)
return []
return (True, [])

def ack(self, mlist) -> None:

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def gather(self, messageCountMax):
if set(['gather']) & self.o.logEvents:
logger.info( f' messageCountMax: {messageCountMax} ')

return []
return (True, [])

def _messageStr(self, msg):
if self.o.logMessageDump:
Expand Down
1 change: 0 additions & 1 deletion sarracenia/flowcb/poll/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class Poll(FlowCB):

* options are passed to sarracenia.Transfer classes for their use as well.


Poll uses sarracenia.transfer (ftp, sftp, https, etc... )classes to
requests lists of files using those protocols using built-in logic.

Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/poll/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Airnow(FlowCB):

def poll(self):

sleep = self.o.sleep
sleep = self.o.scheduled_interval

gathered_messages = []
for Hours in range(1, 3):
Expand Down
6 changes: 3 additions & 3 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ def gather(self, qty) -> None:

"""
if not features['retry']['present'] or not self.o.retry_refilter:
return []
return (True, [])

if qty <= 0: return []
if qty <= 0: return (True, [])

message_list = self.download_retry.get(qty)

Expand All @@ -99,7 +99,7 @@ def gather(self, qty) -> None:
m['_deleteOnPost'] = set( [ '_isRetry' ] )


return message_list
return (True, message_list)


def after_accept(self, worklist) -> None:
Expand Down
2 changes: 1 addition & 1 deletion sarracenia/flowcb/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def gather(self, messageCountMax):
if hasattr(self.o,
'run_gather') and self.o.run_gather is not None:
self.run_script(self.o.run_gather)
return []
return (True, [])

def after_accept(self, worklist):
"""
Expand Down
19 changes: 16 additions & 3 deletions sarracenia/flowcb/scheduled/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ def __init__(self,options,logger=logger):

now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
self.update_appointments(now)
self.first_interval=True

def gather(self,messageCountMax):

# for next expected post
self.wait_until_next()

if self.stop_requested or self.housekeeping_needed:
return []
return (True, [])

logger.info('time to run')

Expand All @@ -105,7 +106,7 @@ def gather(self,messageCountMax):
m = sarracenia.Message.fromFileInfo(relPath, self.o, st)
gathered_messages.append(m)

return gathered_messages
return (True, gathered_messages)

def on_housekeeping(self):

Expand Down Expand Up @@ -166,20 +167,32 @@ def wait_until( self, appointment ):
def wait_until_next( self ):

if self.o.scheduled_interval > 0:
if self.first_interval:
self.first_interval=False
return

self.wait_seconds(datetime.timedelta(seconds=self.o.scheduled_interval))
return

if ( len(self.o.scheduled_hour) > 0 ) or ( len(self.o.scheduled_minute) > 0 ):
now = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)
next_appointment=None
missed_appointments=[]
for t in self.appointments:
if now < t:
next_appointment=t
break
else:
logger.info( f'already too late to {t} skipping' )
missed_appointments.append(t)

if missed_appointments:
for ma in missed_appointments:
self.appointments.remove(ma)

if next_appointment is None:
# done for the day...
tomorrow = datetime.date.today()+datetime.timedelta(days=1)
tomorrow = datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc)+datetime.timedelta(days=1)
midnight = datetime.time(0,0,tzinfo=datetime.timezone.utc)
midnight = datetime.datetime.combine(tomorrow,midnight)
self.update_appointments(midnight)
Expand Down
Loading
Loading