You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
packageorg.apache.kafka.clients.producer;
importorg.apache.kafka.common.header.Header;
importorg.apache.kafka.common.header.Headers;
importorg.apache.kafka.common.header.internals.RecordHeaders;
importjava.util.Objects;
/** * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional * partition number, and an optional key and value. * <p> * If a valid partition number is specified that partition will be used when sending the record. If no partition is * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is * present a partition will be assigned in a round-robin fashion. * <p> * The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the * record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for * the topic. * <li> * If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}, * the timestamp in the producer record will be used by the broker. * </li> * <li> * If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime}, * the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the * message to its log. * </li> * <p> * In either of the cases above, the timestamp that has actually been used will be returned to user in * {@link RecordMetadata} */publicclassProducerRecord<K, V> {
privatefinalStringtopic;
privatefinalIntegerpartition;
privatefinalHeadersheaders;
privatefinalKkey;
privatefinalVvalue;
privatefinalLongtimestamp;
/** * Creates a record with a specified timestamp to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign * the timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents * @param headers the headers that will be included in the record */publicProducerRecord(Stringtopic, Integerpartition, Longtimestamp, Kkey, Vvalue, Iterable<Header> headers) {
if (topic == null)
thrownewIllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
thrownewIllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
thrownewIllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = newRecordHeaders(headers);
}
/** * Creates a record with a specified timestamp to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the * timestamp using System.currentTimeMillis(). * @param key The key that will be included in the record * @param value The record contents */publicProducerRecord(Stringtopic, Integerpartition, Longtimestamp, Kkey, Vvalue) {
this(topic, partition, timestamp, key, value, null);
}
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents * @param headers The headers that will be included in the record */publicProducerRecord(Stringtopic, Integerpartition, Kkey, Vvalue, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */publicProducerRecord(Stringtopic, Integerpartition, Kkey, Vvalue) {
this(topic, partition, null, key, value, null);
}
/** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to * @param key The key that will be included in the record * @param value The record contents */publicProducerRecord(Stringtopic, Kkey, Vvalue) {
this(topic, null, null, key, value, null);
}
/** * Create a record with no key * * @param topic The topic this record should be sent to * @param value The record contents */publicProducerRecord(Stringtopic, Vvalue) {
this(topic, null, null, null, value, null);
}
/** * @return The topic this record is being sent to */publicStringtopic() {
returntopic;
}
/** * @return The headers */publicHeadersheaders() {
returnheaders;
}
/** * @return The key (or null if no key is specified) */publicKkey() {
returnkey;
}
/** * @return The value */publicVvalue() {
returnvalue;
}
/** * @return The timestamp, which is in milliseconds since epoch. */publicLongtimestamp() {
returntimestamp;
}
/** * @return The partition to which the record will be sent (or null if no partition was specified) */publicIntegerpartition() {
returnpartition;
}
@OverridepublicStringtoString() {
Stringheaders = this.headers == null ? "null" : this.headers.toString();
Stringkey = this.key == null ? "null" : this.key.toString();
Stringvalue = this.value == null ? "null" : this.value.toString();
Stringtimestamp = this.timestamp == null ? "null" : this.timestamp.toString();
return"ProducerRecord(topic=" + topic + ", partition=" + partition + ", headers=" + headers + ", key=" + key + ", value=" + value +
", timestamp=" + timestamp + ")";
}
@Overridepublicbooleanequals(Objecto) {
if (this == o)
returntrue;
elseif (!(oinstanceofProducerRecord))
returnfalse;
ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o;
returnObjects.equals(key, that.key) &&
Objects.equals(partition, that.partition) &&
Objects.equals(topic, that.topic) &&
Objects.equals(headers, that.headers) &&
Objects.equals(value, that.value) &&
Objects.equals(timestamp, that.timestamp);
}
@OverridepublicinthashCode() {
intresult = topic != null ? topic.hashCode() : 0;
result = 31 * result + (partition != null ? partition.hashCode() : 0);
result = 31 * result + (headers != null ? headers.hashCode() : 0);
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
returnresult;
}
}
由于
The text was updated successfully, but these errors were encountered:
请描述您的问题
询问有关本项目的使用和其他方面的相关问题。
我想使用JSON序列化和反序列化对象org.apache.kafka.clients.producer.ProducerRecord,他没有get set方法,所以直接使用
JSON.toJSONString(record)
是行不通的。由于他在二进制包org.apache.kafka:afka-clients中,我无法修改其源代码添加@JSONFiled注解,所以我想问一下,有没有什么好的方式,在不修改也不使用继承对象的方式,直接序列化ProducerRecord类。比如有没有api可以自定义序列化ProducerRecord对象的逻辑
源代码文件如下:
由于
The text was updated successfully, but these errors were encountered: