Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming to acall #158

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Conversation

mneedham
Copy link

@liyin2015 I played around with how to add streaming to the acall function, but I dunno whether this is the right way to do it as I'm a newbie when it comes to using async.

So I've just implemented it for the Ollama Client for the time being.

Let me know what you think?

@mneedham
Copy link
Author

And here's how you use it:

async def my_fn(stream_response=True):
    model_client = OllamaClient(host="http://localhost:11434")
    model_kwargs = {"model": "llama3.1", "stream": stream_response}
    generator = Generator(model_client=model_client, model_kwargs=model_kwargs)

    response = await generator.acall({"input_str": "What would happen if a lion and an elephant met three dogs and four hyenas?"})
    if stream_response:
      async for chunk in response.data:
          print(chunk, end='', flush=True)
    else:
      print(response.data)

y = asyncio.run(my_fn())

That's quite an interesting scenario!

If a lion and an elephant were to meet with three dogs and four hyenas, I think the outcome would depend on various factors such as the size and ferocity of each individual animal.

Initially, the lion might try to assert its dominance over the smaller animals (the three dogs). However, the presence of the elephant and the hyenas could change the dynamics. Elephants are known for their strength and memory, so they might be wary of the lion's intentions.

The four hyenas, being scavengers and often seen as a pack, would likely try to capitalize on any potential chaos. They're known for their cunning and ability to work together, which could put them at an advantage in this scenario.

The three dogs, depending on their breed and temperament, might either run away or try to join forces with the hyenas against the lion and elephant.

Ultimately, it would likely be a chaotic scene, with each animal trying to protect itself. The size and strength of the lion and elephant would initially give them an advantage, but if the dogs and hyenas were able to work together, they might be able to drive out or distract the larger predators, creating an opportunity for escape or counterattack.

Of course, this is all speculation, and in reality, such a scenario would likely play out differently depending on various factors like habitat, weather conditions, and individual animal personalities!

@mneedham mneedham changed the title PoC to add streaming to acall Add streaming to acall Jul 25, 2024
@mneedham
Copy link
Author

@liyin2015 Added tests around this code.

@mneedham
Copy link
Author

@liyin2015 I don't think the acall streaming works for the other clients either? I'll try to go through the others and do the same thing

@@ -263,8 +286,8 @@ async def acall(
api_kwargs = self._pre_call(prompt_kwargs, model_kwargs)
completion = await self.model_client.acall(
api_kwargs=api_kwargs, model_type=self.model_type
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im thinking we need to have a new function, self.model_client_call and model_client_acall to call and parse the completion this way, the pre_call and post_call does not need to be async for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im working on something also need to separate it:

    def _model_client_call(self, api_kwargs: Dict) -> Any:
        # call the model client
        try:
            # check the cache
            index_content = json.dumps(api_kwargs)  # all messages
            cached_completion = self._check_cache(index_content)
            if cached_completion is not None:
                return cached_completion
            completion = self.model_client.call(
                api_kwargs=api_kwargs, model_type=self.model_type
            )
            # prepare cache
            self._save_cache(index_content, completion)
            return completion
        except Exception as e:
            log.error(f"Error calling the model: {e}")
            raise e

You can use this minus the cache, here is how to use it in the call

 output: GeneratorOutputType = None
        # call the model client

        completion = None
        try:
            completion = self._model_client_call(api_kwargs=api_kwargs)
        except Exception as e:
            log.error(f"Error calling the model: {e}")
            output = GeneratorOutput(error=str(e))
        # process the completion
        if completion:
            try:
                output = self._post_call(completion)

            except Exception as e:
                log.error(f"Error processing the output: {e}")
                output = GeneratorOutput(raw_response=str(completion), error=str(e))

Copy link
Member

@liyin2015 liyin2015 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mneedham thanks for the pr, its great work. only one change will need

@mneedham
Copy link
Author

mneedham commented Aug 9, 2024

@liyin2015 Is the change that you mentioned committed now? I guess I can pull down your changes and fix this PR

@liyin2015
Copy link
Member

@mneedham sorry you have to do a rebase now, and i will finish reviewing this time. Its really close

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants