Skip to content

Commit

Permalink
gunzip before decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
gvelez17 committed May 22, 2024
1 parent 4c37e86 commit 333b3db
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions logs-to-tsdb/kinesis-lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from warnings import warn

# Database connection parameters
DB_HOST = os.environ['DB_HOST']
DB_HOST = os.environ.get('DB_HOST')
DB_PORT = '5432'
DB_USER = 'tsuser'
DB_PASSWORD = os.environ['DB_PASSWORD']
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DB_NAME = 'tsdb'

log_pattern = re.compile(r'received: (.*})')
Expand All @@ -24,25 +24,24 @@ def handler(event, context):
pprint(event)
warn(pformat(event))
# Connect to the TimescaleDB
print("Connecting to db host " + DB_HOST)
conn = psycopg2.connect(
if DB_HOST:
# normally we should connect, in tests we do not
print("Connecting to db host " + DB_HOST)
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
dbname=DB_NAME
)
cursor = conn.cursor()
)
cursor = conn.cursor()

batched = []

print("NOT decoding data")
for record in event['Records']:

payload = record['kinesis']['data']
# it doesn't appear to be encoded
# payload = b64decode(record['kinesis']['data']).decode('utf-8')
data = json.loads(payload)
payload = b64decode(record['kinesis']['data'])
decoded = gzip.decompress(payload).decode('utf-8')
data = json.loads(decoded)

for ev in data['logEvents']:
# Transform log_event as required and add to batched_logs
Expand Down Expand Up @@ -92,20 +91,18 @@ def handler(event, context):
}

event = {
"Records": [
{
"kinesis": {
"data": json.dumps({
"logEvents": [
{
"timestamp": 1625239073000,
"message": "your log message"
}
]
})
}
}
]
"Records": [{"awsRegion": "us-east-2",
"eventID": "shardId-000000000000:49652274695115981779048041884426019726251995147880366082",
"eventName": "aws:kinesis:record",
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-2:967314784947:stream/ceramic-cas-log-stream-tnet",
"eventVersion": "1.0",
"invokeIdentityArn": "arn:aws:iam::967314784947:role/lambda_role",
"kinesis": {"approximateArrivalTimestamp": 1716361597.516,
"data": "H4sIAAAAAAAA/02RS2/TQBSF/0p0N2ySdF6e1y5AWqgoFBIBoqkqezyNHb9n/Ihd9b+jGipY3cV39B3p3CcorPfh0e7H2oKG95v95uFmu9ttrrawhGoorQMNiguKmZBMMQFLyKvjlau6GjRcWOMvjHVhkZpVW9p2ZUL/J7JrnQ0L0GBC/xDW6cXrpVLwQDIWGoMZU0QFCGMuGIllRA0JYAm+i7xxad2mVXmZ5q11HvQdvNvsPlVHv/uPwv1ctu1t2b5kniCNQQOVRHDMCEeIIUQFkpxRpBBRhFFEXzqZkpjTgCLGEBWcyiAQDJbQpoX1bVjUoLHAnHIcqAAJtXzdCjTcEUTYCgUrQvZIaMQ1DdZIqF/3i483t1++7Tef93rxZlOapHILZ5vO+nbhrLFpb2O9eDqASeMD6ANE4dE0LwvmPSOUFIzRjA95NyXT2CcTTWlGeZaU5+EcNFkcN1FS2V7UtTg24QGWB4j/muI01pkd9cRvsur7Nipu2dur6+v+NPKTuZx+up2NfObaIYvZron6D9XX9IcMh1lSVLHNZ012mnKe9I8uGriRZ+aQTKg6UdTmNMTqjJxEKQ6SLhpk+Nizx44MqlDVMDETVfls8/Pv/+kyMxZikKMQZdbldWPxyffEB8l5SoJiwmPSFg1JxpOfrJD2VCpzLs6DNcdZV7n0mJazjrK1wGtMyRorNEMTmrCa2QGe38Dz/fNv+/wUvtYCAAA=",
"kinesisSchemaVersion": "1.0",
"partitionKey": "e4883c0bfd788e81bb5d969a4fcd7d49",
"sequenceNumber": "49652274695115981779048041884426019726251995147880366082"}}]
}
pprint(event)
context = {}
Expand Down

0 comments on commit 333b3db

Please sign in to comment.