Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove a broken link in Readme.md #332

Merged
merged 5 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,15 @@ introduced in Python 3.6 (`async`, `await`, variable type annotations).

### I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this

You may need to increase the limit for the maximum number of open files. The
following post explains how to do so on OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/
You may need to increase the limit for the maximum number of open files.
On macOS and Linux you can use:

```ulimit -n max_open_files``` to increase the open files limit to max_open_files.

On docker, you can use the --ulimit flag:

```docker run --ulimit nofile=50000:100000 <image-tag>```
where 50000 is the soft limit, and 100000 is the hard limit [See the difference](https://unix.stackexchange.com/a/29579).

### What kafka versions faust supports

Expand Down
19 changes: 3 additions & 16 deletions faust/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,27 +202,14 @@ def _extract_arg_from_argv( # pragma: no cover
"faust.models": ["ModelOptions", "Record"],
"faust.sensors": ["Monitor", "Sensor"],
"faust.serializers": ["Codec", "Schema"],
"faust.streams": [
"Stream",
"StreamT",
"current_event",
],
"faust.streams": ["Stream", "StreamT", "current_event",],
"faust.tables.globaltable": ["GlobalTable"],
"faust.tables.sets": ["SetTable", "SetGlobalTable"],
"faust.tables.table": ["Table"],
"faust.topics": ["Topic", "TopicT"],
"faust.auth": [
"GSSAPICredentials",
"SASLCredentials",
"SSLCredentials",
],
"faust.auth": ["GSSAPICredentials", "SASLCredentials", "SSLCredentials",],
"faust.types.settings": ["Settings"],
"faust.windows": [
"HoppingWindow",
"TumblingWindow",
"SlidingWindow",
"Window",
],
"faust.windows": ["HoppingWindow", "TumblingWindow", "SlidingWindow", "Window",],
"faust.worker": ["Worker"],
"faust.utils": ["uuid"],
"mode.services": ["Service", "ServiceT"],
Expand Down
26 changes: 7 additions & 19 deletions faust/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ async def _start_one_supervised(
stream: Optional[StreamT] = None,
) -> ActorT:
aref = await self._start_one(
index=index,
active_partitions=active_partitions,
stream=stream,
index=index, active_partitions=active_partitions, stream=stream,
)
self.supervisor.add(aref)
await aref.maybe_start()
Expand Down Expand Up @@ -312,9 +310,7 @@ async def _on_start_supervisor(self) -> None:
channel: ChannelT = cast(ChannelT, None)
for i in range(self.concurrency):
res = await self._start_one(
index=i,
active_partitions=active_partitions,
channel=channel,
index=i, active_partitions=active_partitions, channel=channel,
wbarnha marked this conversation as resolved.
Show resolved Hide resolved
)
if channel is None:
# First concurrency actor creates channel,
Expand Down Expand Up @@ -609,8 +605,7 @@ def stream(
"""Create underlying stream used by this agent."""
if channel is None:
channel = cast(TopicT, self.channel_iterator).clone(
is_iterator=False,
active_partitions=active_partitions,
is_iterator=False, active_partitions=active_partitions,
)
if active_partitions is not None:
assert channel.active_partitions == active_partitions
Expand Down Expand Up @@ -733,14 +728,10 @@ async def _reply(
) -> None:
assert reply_to
response = self._response_class(value)(
key=key,
value=value,
correlation_id=correlation_id,
key=key, value=value, correlation_id=correlation_id,
)
await self.app.send(
reply_to,
key=None,
value=response,
reply_to, key=None, value=response,
)

def _response_class(self, value: Any) -> Type[ReqRepResponse]:
Expand Down Expand Up @@ -858,9 +849,7 @@ def _create_req(
else:
# wrap value in envelope
req = self._request_class(value)(
value=value,
reply_to=topic_name,
correlation_id=correlation_id,
value=value, reply_to=topic_name, correlation_id=correlation_id,
)
return req, open_headers

Expand Down Expand Up @@ -971,8 +960,7 @@ async def join(
all values have been processed.
"""
return await self.kvjoin(
((key, value) async for value in aiter(values)),
reply_to=reply_to,
((key, value) async for value in aiter(values)), reply_to=reply_to,
)

async def kvjoin(
Expand Down
5 changes: 1 addition & 4 deletions faust/agents/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ def human_tracebacks(self) -> str:
[
self.traceback_header,
"\n".join(
self.traceback_format.format(
name=name,
traceback=traceback,
)
self.traceback_format.format(name=name, traceback=traceback,)
for name, traceback in self.actor_tracebacks().items()
),
self.traceback_footer,
Expand Down
35 changes: 8 additions & 27 deletions faust/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ class _Worker:
# This means @app.task attempts to do the right thing depending
# on how it's used. All the frameworks do this, but we have to also type it.
TaskDecoratorRet = Union[
Callable[[TaskArg], TaskArg],
TaskArg,
Callable[[TaskArg], TaskArg], TaskArg,
]


Expand Down Expand Up @@ -294,10 +293,7 @@ def client_only(self) -> Iterable[ServiceT]:

def producer_only(self) -> Iterable[ServiceT]:
"""Return services to start when app is in producer_only mode."""
return self._chain(
self.web_server(),
self.kafka_producer(),
)
return self._chain(self.web_server(), self.kafka_producer(),)

def _chain(self, *arguments: Iterable[ServiceT]) -> Iterable[ServiceT]:
return cast(Iterable[ServiceT], chain.from_iterable(arguments))
Expand Down Expand Up @@ -633,10 +629,7 @@ async def on_init_extra_service(

def _prepare_subservice(self, service: Union[ServiceT, Type[ServiceT]]) -> ServiceT:
if inspect.isclass(service):
return cast(Type[ServiceT], service)(
loop=self.loop,
beacon=self.beacon,
)
return cast(Type[ServiceT], service)(loop=self.loop, beacon=self.beacon,)
else:
return cast(ServiceT, service)

Expand Down Expand Up @@ -948,10 +941,7 @@ def _inner(fun: TaskArg) -> TaskArg:
return _inner(fun) if fun is not None else _inner

def _task(
self,
fun: TaskArg,
on_leader: bool = False,
traced: bool = False,
self, fun: TaskArg, on_leader: bool = False, traced: bool = False,
) -> TaskArg:
app = self

Expand Down Expand Up @@ -1475,8 +1465,7 @@ def _start_span_from_rebalancing(self, name: str) -> opentracing.Span:
if rebalancing_span is not None and self.tracer is not None:
category = f"{self.conf.name}-_faust"
span = self.tracer.get_tracer(category).start_span(
operation_name=name,
child_of=rebalancing_span,
operation_name=name, child_of=rebalancing_span,
)
self._span_add_default_tags(span)
set_current_span(span)
Expand Down Expand Up @@ -1818,10 +1807,7 @@ def _new_cache_backend(self) -> CacheBackendT:
)

def FlowControlQueue(
self,
maxsize: Optional[int] = None,
*,
clear_on_resume: bool = False,
self, maxsize: Optional[int] = None, *, clear_on_resume: bool = False,
) -> ThrowableQueue:
"""Like :class:`asyncio.Queue`, but can be suspended/resumed."""
return ThrowableQueue(
Expand Down Expand Up @@ -1854,10 +1840,7 @@ def __repr__(self) -> str:
id=id(self),
)
else:
return APP_REPR_UNFINALIZED.format(
name=type(self).__name__,
id=id(self),
)
return APP_REPR_UNFINALIZED.format(name=type(self).__name__, id=id(self),)

def _configure(self, *, silent: bool = False) -> None:
self.on_before_configured.send()
Expand Down Expand Up @@ -1987,9 +1970,7 @@ def cache(self, cache: CacheBackendT) -> None:
def tables(self) -> TableManagerT:
"""Map of available tables, and the table manager service."""
manager = self.conf.TableManager( # type: ignore
app=self,
loop=self.loop,
beacon=self.beacon,
app=self, loop=self.loop, beacon=self.beacon,
)
return cast(TableManagerT, manager)

Expand Down
5 changes: 1 addition & 4 deletions faust/assignor/client_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ def can_assign(self, partition: int, active: bool) -> bool:
)

def __repr__(self) -> str:
return R_COPART_ASSIGNMENT.format(
name=type(self).__name__,
self=self,
)
return R_COPART_ASSIGNMENT.format(name=type(self).__name__, self=self,)


class ClientAssignment(
Expand Down
4 changes: 1 addition & 3 deletions faust/assignor/copartitioned_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ def _client_exhausted(
return assignemnt.num_assigned(active) == client_limit

def _find_promotable_standby(
self,
partition: int,
candidates: Iterator[CopartitionedAssignment],
self, partition: int, candidates: Iterator[CopartitionedAssignment],
) -> Optional[CopartitionedAssignment]:
# Round robin to find standby until we make a full cycle
for _ in range(self._num_clients):
Expand Down
4 changes: 1 addition & 3 deletions faust/assignor/partition_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def metadata(self, topics: Set[str]) -> ConsumerProtocolMemberMetadata:

@classmethod
def _group_co_subscribed(
cls,
topics: Set[str],
subscriptions: MemberSubscriptionMapping,
cls, topics: Set[str], subscriptions: MemberSubscriptionMapping,
) -> Iterable[Set[str]]:
topic_subscriptions: MutableMapping[str, Set[str]] = defaultdict(set)
for client, subscription in subscriptions.items():
Expand Down
9 changes: 2 additions & 7 deletions faust/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ def __init__(

def __repr__(self) -> str:
return "<{0}: kerberos service={1!r} domain={2!r}".format(
type(self).__name__,
self.kerberos_service_name,
self.kerberos_domain_name,
type(self).__name__, self.kerberos_service_name, self.kerberos_domain_name,
)


Expand All @@ -100,10 +98,7 @@ def __init__(
) -> None:
if context is None:
context = ssl.create_default_context(
purpose=purpose,
cafile=cafile,
capath=capath,
cadata=cadata,
purpose=purpose, cafile=cafile, capath=capath, cadata=cadata,
)
self.context = context

Expand Down
12 changes: 3 additions & 9 deletions faust/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ def _get_default_schema(
return cast(
SchemaT,
self.app.conf.Schema( # type: ignore
key_type=key_type,
value_type=value_type,
key_type=key_type, value_type=value_type,
),
)

Expand All @@ -140,8 +139,7 @@ def queue(self) -> ThrowableQueue:
if maxsize is None:
maxsize = self.app.conf.stream_buffer_maxsize
self._queue = self.app.FlowControlQueue(
maxsize=maxsize,
clear_on_resume=True,
maxsize=maxsize, clear_on_resume=True,
)
return self._queue

Expand Down Expand Up @@ -616,11 +614,7 @@ def __init__(
)

super().__init__(
app,
schema=schema,
key_type=key_type,
value_type=value_type,
**kwargs,
app, schema=schema, key_type=key_type, value_type=value_type, **kwargs,
)
self.key_serializer = self.schema.key_serializer
self.value_serializer = self.schema.value_serializer
Expand Down
5 changes: 1 addition & 4 deletions faust/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,7 @@ def banner(self, worker: Worker) -> str:
return self._format_banner_table(self._banner_data(worker))

def _format_banner_table(self, data: TableDataT) -> str:
table = self.table(
[(x, str(y)) for x, y in data],
title=self._banner_title(),
)
table = self.table([(x, str(y)) for x, y in data], title=self._banner_title(),)
table.inner_heading_row_border = False
table.inner_row_border = False
return table.table
Expand Down
7 changes: 2 additions & 5 deletions faust/contrib/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ def handler_from_dsn(
dsn=dsn,
include_paths=include_paths,
transport=partial(
raven_aiohttp.QueuedAioHttpTransport,
workers=workers,
qsize=qsize,
raven_aiohttp.QueuedAioHttpTransport, workers=workers, qsize=qsize,
),
disable_existing_loggers=False,
**kwargs
Expand Down Expand Up @@ -141,7 +139,6 @@ def setup(
"faust.contrib.sentry requires the `sentry_sdk` library."
)
sentry_sdk.init(
dsn=dsn,
integrations=[_sdk_aiohttp.AioHttpIntegration()],
dsn=dsn, integrations=[_sdk_aiohttp.AioHttpIntegration()],
)
app.conf.loghandlers.append(sentry_handler)
Loading