Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KafkaReader无法读取 #1053

Closed
Hitooooo opened this issue Jun 12, 2024 · 4 comments
Closed

[Bug]: KafkaReader无法读取 #1053

Hitooooo opened this issue Jun 12, 2024 · 4 comments
Assignees
Labels
bug Something isn't working

Comments

@Hitooooo
Copy link

What happened?

使用以下配置Reader无法消费Kafka消息

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        },
        "reader": {
          "name": "kafkareader",
          "parameter": {
            "brokerList": "localhost:9092",
            "topic": "test-1",
            "column": [
              "*"
            ],
            "missingKeyValue": "\\N",
            "properties": {
              "auto.offset.reset": "earliest"
            }
          }
        }
      }
    ]
  }
}

通过Debug发现是有监听的,但是record还是0

image

Version

4.1.3 (Default)

OS Type

Linux (Default)

Java JDK Version

Oracle JDK 1.8.0

Relevant log output

___      _     _
 / _ \    | |   | |
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  <
\_| |_/\__,_|\__,_|\__,_/_/\_\

:: Addax version ::    (v4.1.4)

2024-06-12 19:16:22.105 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2024-06-12 19:16:22.155 [        main] INFO  Engine               -
{
        "setting":{
                "speed":{
                        "channel":1
                }
        },
        "content":{
                "reader":{
                        "name":"kafkareader",
                        "parameter":{
                                "brokerList":"localhost:9092",
                                "topic":"test-1",
                                "column":[
                                        "*"
                                ],
                                "missingKeyValue":"\\N",
                                "properties":{
                                        "auto.offset.reset":"earliest"
                                }
                        }
                },
                "writer":{
                        "name":"streamwriter",
                        "parameter":{
                                "print":true
                        }
                }
        }
}

2024-06-12 19:16:22.178 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2024-06-12 19:16:22.196 [       job-0] INFO  JobContainer         - The Reader.Job [kafkareader] perform prepare work .
2024-06-12 19:16:22.196 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] perform prepare work .
2024-06-12 19:16:22.197 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2024-06-12 19:16:22.198 [       job-0] INFO  JobContainer         - The Reader.Job [kafkareader] is divided into [1] task(s).
2024-06-12 19:16:22.198 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] is divided into [1] task(s).
2024-06-12 19:16:22.224 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2024-06-12 19:16:22.230 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2024-06-12 19:16:22.232 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2024-06-12 19:16:22.232 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2024-06-12 19:16:22.331 [  reader-0-0] INFO  ConsumerConfig       - ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = addax-kafka-reader
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = addax-kafka-grp
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2024-06-12 19:16:22.485 [  reader-0-0] INFO  AppInfoParser        - Kafka version: 3.2.3
2024-06-12 19:16:22.485 [  reader-0-0] INFO  AppInfoParser        - Kafka commitId: 50029d3ed8ba576f
2024-06-12 19:16:22.485 [  reader-0-0] INFO  AppInfoParser        - Kafka startTimeMs: 1718190982484
2024-06-12 19:16:22.487 [  reader-0-0] INFO  KafkaConsumer        - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Subscribed to topic(s): test-1
2024-06-12 19:16:22.962 [  reader-0-0] INFO  Metadata             - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Resetting the last seen epoch of partition test-1-0 to 0 since the associated topicId changed from null to IF2sFY2LQmunYLRjP2R2Vw
2024-06-12 19:16:22.969 [  reader-0-0] INFO  Metadata             - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Cluster ID: Some(5L6g3nShT-eMCtK--X86sw)
2024-06-12 19:16:22.971 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2024-06-12 19:16:22.978 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] (Re-)joining group
2024-06-12 19:16:23.012 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Request joining group due to: need to re-join with the given member-id: addax-kafka-reader-c06d3cdb-796d-4353-9f0f-b69fc574885c
2024-06-12 19:16:23.013 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
2024-06-12 19:16:23.013 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] (Re-)joining group
2024-06-12 19:16:26.045 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Successfully joined group with generation Generation{generationId=29, memberId='addax-kafka-reader-c06d3cdb-796d-4353-9f0f-b69fc574885c', protocol='range'}
2024-06-12 19:16:26.048 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Finished assignment for group at generation 29: {addax-kafka-reader-c06d3cdb-796d-4353-9f0f-b69fc574885c=Assignment(partitions=[test-1-0])}
2024-06-12 19:16:26.069 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Successfully synced group in generation Generation{generationId=29, memberId='addax-kafka-reader-c06d3cdb-796d-4353-9f0f-b69fc574885c', protocol='range'}
2024-06-12 19:16:26.070 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Notifying assignor about the new Assignment(partitions=[test-1-0])
2024-06-12 19:16:26.075 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Adding newly assigned partitions: test-1-0
2024-06-12 19:16:26.091 [  reader-0-0] INFO  ConsumerCoordinator  - [Consumer clientId=addax-kafka-reader, groupId=addax-kafka-grp] Setting offset for partition test-1-0 to the committed offset FetchPosition{offset=8, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}
2024-06-12 19:16:34.248 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 0.00%
2024-06-12 19:16:46.249 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 0.00%
@Hitooooo Hitooooo added the bug Something isn't working label Jun 12, 2024
@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2024

使用下面的命令测试看看能否读出这个 topic 的内容

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
   --topic test-1 --from-beginning

@Hitooooo
Copy link
Author

是可以的
image

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2024

如果让 Addax 任务一直跑着,然后尝试往 test-1 这个 topic 里持续写数据,看会有输出吗?

@Hitooooo
Copy link
Author

image
加了两行日志,可以看到是能读取Topic消息的,但是写出没反应

@wgzhao wgzhao closed this as completed in 0750c8b Sep 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants