Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception thrown on faust-streaming/faust but not on robinhood/faust #101

Closed
2 tasks done
alxdb opened this issue Feb 14, 2021 · 2 comments
Closed
2 tasks done

Exception thrown on faust-streaming/faust but not on robinhood/faust #101

alxdb opened this issue Feb 14, 2021 · 2 comments
Assignees

Comments

@alxdb
Copy link

alxdb commented Feb 14, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have the same single module app that works correctly with the robinhood version of faust, but does not work with this fork. Namely, the exception quoted in the title on_rebalance() takes 5 positional arguments but 6 were given is thrown on app startup in when faust-streaming but not faust. relevant versions used are 0.4.8 466dbf2 (git master at time of writing) and 1.10.4. The single module app, and exception are as follows:

from typing import Any

import faust


class UserUpdate(faust.Record):
    user_id: int
    event_type: str
    event_data: Any


app = faust.App("faust-demo", broker="kafka://kafka-1:9092")
user_updates = app.topic("user_updates", value_type=UserUpdate, partitions=8)
users = app.Table("user", partitions=8)


@app.agent(user_updates)
async def process_user_updates(updates: faust.StreamT[UserUpdate]):
    async for update in updates:
        user = users.get(update.user_id)
        if user:
            user[update.event_type] = update.event_data
            users[update.user_id] = user
        else:
            users[update.user_id] = {update.event_type: update.event_data}


@app.page("/users")
async def get_users(self, request):
    return self.json(users)

Expected behavior

Exception is not thrown

Actual behavior

Exception is thrown

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/faust/app/base.py", line 1720, in _on_partitions_assigned
    await T(self.tables.on_rebalance)(
  File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.9/site-packages/faust/tables/manager.py", line 194, in on_rebalance
    await T(table.on_rebalance)(
  File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.9/site-packages/faust/tables/base.py", line 571, in on_rebalance
    await self.data.on_rebalance(
TypeError: on_rebalance() takes 5 positional arguments but 6 were given

Versions

  • python: 3.9.1
  • faust: 0.4.8 466dbf2 (master)
  • os: debian (python:3.9 docker image)
  • kafka: 2.7.0
@alxdb alxdb changed the title Unexpected Exception: on_rebalance() takes 5 positional arguments but 6 were given Exception thrown on faust-streaming/faust but not on robinhood/faust Feb 14, 2021
@patkivikram
Copy link
Collaborator

I am unable to reproduce this on the latest master. Can you double check?

@alxdb
Copy link
Author

alxdb commented Feb 15, 2021

I have double checked, at first i thought it might be due to using multiple brokers, but that appears not to be the case. I've created a repo that should be able to recreate the issue: faust-demo/faust-streaming. if you have docker/docker-compose installed could you try it out and see if it reproduces the issue?

@alxdb alxdb closed this as completed Feb 15, 2021
@alxdb alxdb reopened this Feb 15, 2021
@bobh66 bobh66 self-assigned this Feb 18, 2021
bobh66 added a commit to bobh66/faust-1 that referenced this issue Feb 18, 2021
bobh66 added a commit that referenced this issue Feb 19, 2021
* fix extra parameter in on_rebalance call (#101)
@bobh66 bobh66 closed this as completed Feb 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants