Skip to content

Persistent queues corruption with LogStash::Timestamp datatype #13253

@rocco8620

Description

@rocco8620

Logstash information:

  1. Logstash version: 7.15.0
  2. Logstash installation source: DEB package from APT
  3. How is Logstash being run: systemd

Plugins installed: Default ones + logstash-filter-json_encode

JVM:
openjdk version "16.0.1" 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-16.0.1+9 (build 16.0.1+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-16.0.1+9 (build 16.0.1+9, mixed mode, sharing)

OS version: Linux lisp-hz-siem-00 4.19.0-16-amd64 #1 SMP Debian 4.19.181-1 (2021-03-19) x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:

I am experiencing a problem with the persistent queues, specifically queue corruption.
After running logstash for some time (minutes) and restarting it, some pipeline (usually just one) fails to start with the following exception:

[2021-09-23T10:03:27,175][ERROR][logstash.javapipeline    ][output-elasticsearch] Pipeline worker error, the pipeline will be stopped 
{
    :pipeline_id=>"output-elasticsearch",
    :error=>"deserialize invocation error",
    :exception=>Java::OrgLogstashAckedqueue::QueueRuntimeException, 
    :backtrace=>[
        "org.logstash.ackedqueue.Queue.deserialize(Queue.java:701)",
        "org.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)",
        "org.logstash.ackedqueue.Batch.<init>(Batch.java:49)",
        "org.logstash.ackedqueue.Queue.readPageBatch(Queue.java:630)",
        "org.logstash.ackedqueue.Queue.readBatch(Queue.java:563)",
        "org.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:150)",
        "org.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)",
        "org.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)",
        "org.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)",
        "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)",
        "java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)",
        "java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)",
        "java.base/java.lang.reflect.Method.invoke(Method.java:566)",
        "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)",
        "org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)",
        "org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)",
        "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)",
        "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)",
        "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)",
        "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)",
        "org.jruby.runtime.Block.call(Block.java:139)",
        "org.jruby.RubyProc.call(RubyProc.java:318)",
        "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)",
        "java.base/java.lang.Thread.run(Thread.java:829)"
    ], 
    :thread=>"#<Thread:0x413e7d42 sleep>"
}

To identify the problem i tried to debug logstash and put a breakpoint where the exception was raised, to see the full stackatrace:

Exception java.lang.reflect.InvocationTargetException
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at org.logstash.ackedqueue.Queue.deserialize(Queue.java:699)
           at org.logstash.ackedqueue.Batch.deserializeElements(Batch.java:89)
           at org.logstash.ackedqueue.Batch.<init>(Batch.java:49)
           at org.logstash.ackedqueue.Queue.readPageBatch(Queue.java:630)
           at org.logstash.ackedqueue.Queue.readBatch(Queue.java:563)
           at org.logstash.ackedqueue.ext.JRubyAckedQueueExt.readBatch(JRubyAckedQueueExt.java:150)
           at org.logstash.ackedqueue.AckedReadBatch.create(AckedReadBatch.java:49)
           at org.logstash.ext.JrubyAckedReadClientExt.readBatch(JrubyAckedReadClientExt.java:87)
           at org.logstash.execution.WorkerLoop.run(WorkerLoop.java:82)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
           at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:441)
           at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:305)
           at org.jruby.java.invokers.InstanceMethodInvoker.call(InstanceMethodInvoker.java:32)
           at usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$block$start_workers$5(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:295)
           at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)
           at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)
           at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)
           at org.jruby.runtime.Block.call(Block.java:139)
           at org.jruby.RubyProc.call(RubyProc.java:318)
           at org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)
           at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["siem_properties"]->org.logstash.ConvertedMap["logstash_received_at"]
           at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
           at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:353)
           at com.fasterxml.jackson.databind.deser.std.ContainerDeserializerBase.wrapAndThrow(ContainerDeserializerBase.java:181)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:539)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromObject(AsArrayTypeDeserializer.java:61)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserializeWithType(MapDeserializer.java:400)
           at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:68)
           at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4014)
           at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3122)
           at org.logstash.Event.fromSerializableMap(Event.java:234)
           at org.logstash.Event.deserialize(Event.java:510)
           ... 28 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "null"
           at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
           at org.logstash.Timestamp.<init>(Timestamp.java:64)
           at org.logstash.ObjectMappers$TimestampDeserializer.deserialize(ObjectMappers.java:273)
           at org.logstash.ObjectMappers$TimestampDeserializer.deserialize(ObjectMappers.java:262)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer._deserialize(AsArrayTypeDeserializer.java:116)
           at com.fasterxml.jackson.databind.jsontype.impl.AsArrayTypeDeserializer.deserializeTypedFromAny(AsArrayTypeDeserializer.java:71)
           at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserializeWithType(UntypedObjectDeserializer.java:712)
           at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:529)
           ... 50 more

This last stacktrace tells me the problem is with the field [siem_properties][logstash_received_at]. This field is of type LogStash::Timestamp and in the logstash pipeline is set with the ruby plugin in this way:

ruby {
      code => "event.set('[siem_properties][logstash_received_at]' , Time.now.utc())"
}

I noticed that the same problem can happen with the field: @timestamp, that i do not directly set.


To try to identify where the queue corruption happens, i patched the function serialize() of the file org/logstash/Event.java to try to deserialize the just-serialized-event to check for its validity.

public byte[] serialize() {
    HashMap<String, ConvertedMap> map = new HashMap<String, ConvertedMap>(2, 1.0f);
    map.put(DATA_MAP_KEY, this.data);
    map.put(META_MAP_KEY, this.metadata);
    byte[] result = ObjectMappers.CBOR_MAPPER.writeValueAsBytes(map);
    try {
        Event event = Event.deserialize(result);
    }
    catch (Exception e) {
        logger.info("Error serializing event, exception=" + e);
    }
    return result;
}

After the patch, the following lines showed up in the log:

[2021-09-27T14:12:42,984][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])
[2021-09-27T14:12:43,027][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])
[2021-09-27T14:12:43,071][INFO ][org.logstash.Event       ][win-logins-correlation][5985a8e6151e894bad3977df5c1a852b021ef839cb037366465d8f7b654289ac] Error serializing event, exception=com.fasterxml.jackson.databind.JsonMappingException: Invalid format: "null" (through reference chain: java.util.HashMap["DATA"]->org.logstash.ConvertedMap["_@timestamp"])

I am unable to pintpoint the exact causes and to create a reproducible example because i am using multiple pipelines in a quite complex environment, but i can try to come up with an example if necessary. I can also try some experiments in my environment if needed.

I have also got an example of corrupted queue files to analize if it can be useful.


To give some context about my logstash pipelines:

  • I have got about 15 pipelines, but only 3 of them are probably related to the problem.

Those are:

winlogbeat-5001 ➡️ win-logins-correlation ➡️ output-elasticsearch

  • winlogbeat-5001: Receives data from the beats {} input, adds the field [siem_properties][logstash_received_at] and sends the events to the next pipeline.
  • win-logins-correlation: Receives data from the pipeline winlogbeat-5001, does some processing with the aggregate {} plugin (including saving some timestamp on the aggregate maps and adding them to a new event) and sends the events to the next pipeline.
  • output-elasticsearch: Receives data from the pipeline win-logins-correlation, sends the events to elasticsearch

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions