diff --git a/logs-to-tsdb/kinesis-lambda.py b/logs-to-tsdb/kinesis-lambda.py index f33c1c7..c038f71 100644 --- a/logs-to-tsdb/kinesis-lambda.py +++ b/logs-to-tsdb/kinesis-lambda.py @@ -9,10 +9,10 @@ from warnings import warn # Database connection parameters -DB_HOST = os.environ['DB_HOST'] +DB_HOST = os.environ.get('DB_HOST') DB_PORT = '5432' DB_USER = 'tsuser' -DB_PASSWORD = os.environ['DB_PASSWORD'] +DB_PASSWORD = os.environ.get('DB_PASSWORD') DB_NAME = 'tsdb' log_pattern = re.compile(r'received: (.*})') @@ -24,25 +24,24 @@ def handler(event, context): pprint(event) warn(pformat(event)) # Connect to the TimescaleDB - print("Connecting to db host " + DB_HOST) - conn = psycopg2.connect( + if DB_HOST: + # normally we should connect, in tests we do not + print("Connecting to db host " + DB_HOST) + conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME - ) - cursor = conn.cursor() + ) + cursor = conn.cursor() batched = [] - print("NOT decoding data") for record in event['Records']: - - payload = record['kinesis']['data'] - # it doesn't appear to be encoded - # payload = b64decode(record['kinesis']['data']).decode('utf-8') - data = json.loads(payload) + payload = b64decode(record['kinesis']['data']) + decoded = gzip.decompress(payload).decode('utf-8') + data = json.loads(decoded) for ev in data['logEvents']: # Transform log_event as required and add to batched_logs @@ -92,20 +91,18 @@ def handler(event, context): } event = { - "Records": [ - { - "kinesis": { - "data": json.dumps({ - "logEvents": [ - { - "timestamp": 1625239073000, - "message": "your log message" - } - ] - }) - } - } - ] + "Records": [{"awsRegion": "us-east-2", + "eventID": "shardId-000000000000:49652274695115981779048041884426019726251995147880366082", + "eventName": "aws:kinesis:record", + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:us-east-2:967314784947:stream/ceramic-cas-log-stream-tnet", + "eventVersion": "1.0", + "invokeIdentityArn": "arn:aws:iam::967314784947:role/lambda_role", + "kinesis": {"approximateArrivalTimestamp": 1716361597.516, + "data": "H4sIAAAAAAAA/02RS2/TQBSF/0p0N2ySdF6e1y5AWqgoFBIBoqkqezyNHb9n/Ihd9b+jGipY3cV39B3p3CcorPfh0e7H2oKG95v95uFmu9ttrrawhGoorQMNiguKmZBMMQFLyKvjlau6GjRcWOMvjHVhkZpVW9p2ZUL/J7JrnQ0L0GBC/xDW6cXrpVLwQDIWGoMZU0QFCGMuGIllRA0JYAm+i7xxad2mVXmZ5q11HvQdvNvsPlVHv/uPwv1ctu1t2b5kniCNQQOVRHDMCEeIIUQFkpxRpBBRhFFEXzqZkpjTgCLGEBWcyiAQDJbQpoX1bVjUoLHAnHIcqAAJtXzdCjTcEUTYCgUrQvZIaMQ1DdZIqF/3i483t1++7Tef93rxZlOapHILZ5vO+nbhrLFpb2O9eDqASeMD6ANE4dE0LwvmPSOUFIzRjA95NyXT2CcTTWlGeZaU5+EcNFkcN1FS2V7UtTg24QGWB4j/muI01pkd9cRvsur7Nipu2dur6+v+NPKTuZx+up2NfObaIYvZron6D9XX9IcMh1lSVLHNZ012mnKe9I8uGriRZ+aQTKg6UdTmNMTqjJxEKQ6SLhpk+Nizx44MqlDVMDETVfls8/Pv/+kyMxZikKMQZdbldWPxyffEB8l5SoJiwmPSFg1JxpOfrJD2VCpzLs6DNcdZV7n0mJazjrK1wGtMyRorNEMTmrCa2QGe38Dz/fNv+/wUvtYCAAA=", + "kinesisSchemaVersion": "1.0", + "partitionKey": "e4883c0bfd788e81bb5d969a4fcd7d49", + "sequenceNumber": "49652274695115981779048041884426019726251995147880366082"}}] } pprint(event) context = {}