diff --git a/CHANGELOG.md b/CHANGELOG.md index fb246cfe..2d5784fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## v0.0.67 (2021-07-15) +## v0.0.67 (%as) * [573b31c](https://github.com/argoproj-labs/argo-dataflow/commit/573b31c50db92c85aad9fbf640a79bbfb67410cd) feat: stan reliable auto reconnection (#112) @@ -8,7 +8,7 @@ * Derek Wang -## v0.0.66 (2021-07-15) +## v0.0.66 (%as) * [8f81586](https://github.com/argoproj-labs/argo-dataflow/commit/8f81586a710505decd2855b02629a60d7d4ac276) feat: add version metrics. Closes #109 * [1adec35](https://github.com/argoproj-labs/argo-dataflow/commit/1adec358710bf0af3c84f5de5be7263b0b3bc6ca) fix: add retries labels @@ -18,7 +18,7 @@ * Alex Collins -## v0.0.65 (2021-07-13) +## v0.0.65 (%as) * [e8fa437](https://github.com/argoproj-labs/argo-dataflow/commit/e8fa437ccd0f82ed6a2b50a194edb1b738444a22) fix: use shared counter for Kafka commit * [3ad7cf1](https://github.com/argoproj-labs/argo-dataflow/commit/3ad7cf1ddcf70752ebdd9684bb8ccb10aed0d8f1) fix: stop Kafka dropping messages on disruption @@ -32,7 +32,7 @@ * Alex Collins -## v0.0.64 (2021-07-12) +## v0.0.64 (%as) * [11b2033](https://github.com/argoproj-labs/argo-dataflow/commit/11b2033b81a4ac2ecdd9e449cd3c11bba652e653) feat: update resources to only apply to built-ins @@ -40,7 +40,7 @@ * Alex Collins -## v0.0.63 (2021-07-12) +## v0.0.63 (%as) * [d389e44](https://github.com/argoproj-labs/argo-dataflow/commit/d389e447caa132a668596e5f0c159f9368b572a4) feat: change pod hash when image changes @@ -48,13 +48,13 @@ * Alex Collins -## v0.0.62 (2021-07-12) +## v0.0.62 (%as) ### Contributors -## v0.0.61 (2021-07-12) +## v0.0.61 (%as) * [f306c0c](https://github.com/argoproj-labs/argo-dataflow/commit/f306c0c2bada95e71a44a6a3045812508256e77a) fix: allow lastUpdated to be empty @@ -62,7 +62,7 @@ * Alex Collins -## v0.0.60 (2021-07-12) +## v0.0.60 (%as) * [6ac6810](https://github.com/argoproj-labs/argo-dataflow/commit/6ac68101dc04c6aefb6cf645331040e0668f25e9) feat(controller): use version from build * [858d717](https://github.com/argoproj-labs/argo-dataflow/commit/858d71754e542564728772d96863549294696a7d) feat: support HTTP headers for HTTP sink @@ -83,7 +83,7 @@ * Derek Wang * Saravanan Balasubramanian -## v0.0.59 (2021-06-29) +## v0.0.59 (%as) * [778f24a](https://github.com/argoproj-labs/argo-dataflow/commit/778f24a5ab34ad6f8b3b422010e0c149e0be1c0d) fix: prevent Kafka pending loop dieing on disconnection * [f289249](https://github.com/argoproj-labs/argo-dataflow/commit/f2892494ca727f0541b44378796d18beb9bde134) refactor: container killer refactor @@ -93,7 +93,7 @@ * Alex Collins -## v0.0.58 (2021-06-28) +## v0.0.58 (%as) * [70acfa7](https://github.com/argoproj-labs/argo-dataflow/commit/70acfa7895545e6c93116b3ae6a918d0549be73e) fix: improved dedupe * [69e43d6](https://github.com/argoproj-labs/argo-dataflow/commit/69e43d6fbf0858fd042ebafb84c53cfaf4b7e377) test http fmea @@ -102,7 +102,7 @@ * Alex Collins -## v0.0.57 (2021-06-24) +## v0.0.57 (%as) * [972ebf0](https://github.com/argoproj-labs/argo-dataflow/commit/972ebf03c232bb37ac72bcd99d39b072381eeca2) fix: increased default CPU resources requests to 250m * [786bc05](https://github.com/argoproj-labs/argo-dataflow/commit/786bc05ceaaa7059202ea20881f7bd00953c1f5e) fix: move patching step status earlier in shutdown sequence @@ -118,7 +118,7 @@ * Alex Collins -## v0.0.56 (2021-06-22) +## v0.0.56 (%as) * [ed68cb0](https://github.com/argoproj-labs/argo-dataflow/commit/ed68cb02c8ca114c9e39708ec7586b056ce76948) config: ARGO_DATAFLOW_UPDATE_INTERVAL=10s for dev * [c564148](https://github.com/argoproj-labs/argo-dataflow/commit/c564148e90a2fb7cdcc0038f0469e2956b450ec5) fix(controller): correct service name @@ -127,7 +127,7 @@ * Alex Collins -## v0.0.55 (2021-06-21) +## v0.0.55 (%as) * [e5fa079](https://github.com/argoproj-labs/argo-dataflow/commit/e5fa0791858fda3bdc43ccfd8f7348975e9dc905) feat: pprof sidecar * [3c0d169](https://github.com/argoproj-labs/argo-dataflow/commit/3c0d169b2fd111e313d9a76b62405f1795665edd) fix: Kafka pending wrong when re-using pipeline @@ -140,7 +140,7 @@ * Alex Collins * Derek Wang -## v0.0.54 (2021-06-17) +## v0.0.54 (%as) * [0d68892](https://github.com/argoproj-labs/argo-dataflow/commit/0d688927826574de93aaa8fb9351b108408291ee) feat: add HTTP source service name * [e2728c6](https://github.com/argoproj-labs/argo-dataflow/commit/e2728c66fd766f2ebf2f0b24df693c9592cc25df) feat(sidecar): disable Kafka auto-commit @@ -157,7 +157,7 @@ * Alex Collins -## v0.0.53 (2021-06-16) +## v0.0.53 (%as) * [454c0d7](https://github.com/argoproj-labs/argo-dataflow/commit/454c0d7e23767f868f00d56e4c4499c4a1db6668) fix: set MaxInFlight and AckWait for stan * [6b7be67](https://github.com/argoproj-labs/argo-dataflow/commit/6b7be6738c49daf757f09f96e18cdb1c9710913c) fix: fix logger @@ -167,7 +167,7 @@ * Alex Collins * Derek Wang -## v0.0.52 (2021-06-15) +## v0.0.52 (%as) * [12a98cd](https://github.com/argoproj-labs/argo-dataflow/commit/12a98cd3e549e3fafc727341b7093ee555614911) fix: stan manual ack @@ -175,7 +175,7 @@ * Derek Wang -## v0.0.51 (2021-06-15) +## v0.0.51 (%as) * [88f77f7](https://github.com/argoproj-labs/argo-dataflow/commit/88f77f7e308d2c1a3ab8277b9ed8011d3de7f4ec) feat: change Errors to RecentErrors * [a4db306](https://github.com/argoproj-labs/argo-dataflow/commit/a4db30654f2c198e8582b2a9d424156081dc1842) fix: bug in STAN ack @@ -184,7 +184,7 @@ * Alex Collins -## v0.0.50 (2021-06-15) +## v0.0.50 (%as) * [6f7bb50](https://github.com/argoproj-labs/argo-dataflow/commit/6f7bb505b636d5de21921040d1e9f6ca20a66d0a) config: change argo-server to HTTP * [4770ed6](https://github.com/argoproj-labs/argo-dataflow/commit/4770ed65d9e0fedd5215157acf6ada268bfd02d4) config: change argo-server to HTTP @@ -198,7 +198,7 @@ * Alex Collins * wanghong230 -## v0.0.49 (2021-06-10) +## v0.0.49 (%as) * [23fc919](https://github.com/argoproj-labs/argo-dataflow/commit/23fc9195e87a74e798b5045a758a06b16a26d975) fix: Add missing kubernetes requirement in setup.py (#64) * [dbc403d](https://github.com/argoproj-labs/argo-dataflow/commit/dbc403d4d23a1a047392861e26d1aedcda04f061) fix: fix Python @@ -212,7 +212,7 @@ * Alex Collins * Yuan Tang -## v0.0.48 (2021-06-10) +## v0.0.48 (%as) * [e61c307](https://github.com/argoproj-labs/argo-dataflow/commit/e61c307d9bed9ae33892c57cde4da5a06c26ffbb) feat: add context to Java runtime * [5becca6](https://github.com/argoproj-labs/argo-dataflow/commit/5becca695ef81e26e116d578f6385262ed881638) feat: add context to Git example @@ -222,7 +222,7 @@ * Alex Collins -## v0.0.47 (2021-06-10) +## v0.0.47 (%as) * [34b75ae](https://github.com/argoproj-labs/argo-dataflow/commit/34b75ae05de30b1e80fa1eb0312d398c59095c77) fix: change default retryPolicy=Always * [a9e83fa](https://github.com/argoproj-labs/argo-dataflow/commit/a9e83fa0b033ae9a7c1355d14a4b30acbb946bea) feat: add Golang context @@ -231,7 +231,7 @@ * Alex Collins -## v0.0.46 (2021-06-09) +## v0.0.46 (%as) * [268b1ec](https://github.com/argoproj-labs/argo-dataflow/commit/268b1ecd89e9cd86be2384e44eeda441d4d190b8) fix: removed parallel * [4cd5e8b](https://github.com/argoproj-labs/argo-dataflow/commit/4cd5e8b248142a8baec7170935de4658fca7ea62) feat: switch to manual ack @@ -242,7 +242,7 @@ * Alex Collins * Derek Wang -## v0.0.45 (2021-06-09) +## v0.0.45 (%as) * [b8e88fe](https://github.com/argoproj-labs/argo-dataflow/commit/b8e88fe3ba1aa1c62cb965560ccbcfddbeb6e61b) fix: nats pending messages type assersion error @@ -250,7 +250,7 @@ * Derek Wang -## v0.0.44 (2021-06-09) +## v0.0.44 (%as) * [12f0e5e](https://github.com/argoproj-labs/argo-dataflow/commit/12f0e5e9f4cf2dbd792f94a5656bddf7aba6052b) fix: correct total counter * [bf7b993](https://github.com/argoproj-labs/argo-dataflow/commit/bf7b99324199fc26de068ded51a01f4aba312657) fix: nats pending messages @@ -260,7 +260,7 @@ * Alex Collins * Derek Wang -## v0.0.43 (2021-06-08) +## v0.0.43 (%as) * [a03615d](https://github.com/argoproj-labs/argo-dataflow/commit/a03615d7ccd5d7ed68479cdebff76b0ce5e05a17) ok @@ -268,7 +268,7 @@ * Alex Collins -## v0.0.42 (2021-06-08) +## v0.0.42 (%as) * [d2f219f](https://github.com/argoproj-labs/argo-dataflow/commit/d2f219f806277636ab5b4c41dd26ea87c887ec3a) ok * [abbf7d0](https://github.com/argoproj-labs/argo-dataflow/commit/abbf7d09b6ef6ca36d7538acfc663907233b08b6) fix: various issues @@ -280,7 +280,7 @@ * Alex Collins -## v0.0.41 (2021-06-07) +## v0.0.41 (%as) * [cd2082e](https://github.com/argoproj-labs/argo-dataflow/commit/cd2082e893ac130a4367894b8f1a924fad070f84) feat: simplify so only replica 0 report metrics @@ -288,7 +288,7 @@ * Alex Collins -## v0.0.40 (2021-06-07) +## v0.0.40 (%as) * [da6a1e0](https://github.com/argoproj-labs/argo-dataflow/commit/da6a1e0e396a9d225ec1b8b25213597d69b995eb) fix: fix permissions @@ -296,7 +296,7 @@ * Alex Collins -## v0.0.39 (2021-06-07) +## v0.0.39 (%as) * [e1cc03c](https://github.com/argoproj-labs/argo-dataflow/commit/e1cc03c070744057d2f4cce0bfb32aa19861a696) config: expose 9090 * [5f9c09c](https://github.com/argoproj-labs/argo-dataflow/commit/5f9c09c62a72cb779b5595fae24dbaf4df6be400) config: expose 9090 @@ -305,7 +305,7 @@ * Alex Collins -## v0.0.38 (2021-06-07) +## v0.0.38 (%as) * [fcfb45f](https://github.com/argoproj-labs/argo-dataflow/commit/fcfb45f880d7b414136f16be32dcd5bcdaf303e3) fix: bugs in grouping * [bf55468](https://github.com/argoproj-labs/argo-dataflow/commit/bf55468731dfb63244b66062ab30c93c9edfb992) refactor: de-couple scaling @@ -317,7 +317,7 @@ * Alex Collins -## v0.0.37 (2021-06-06) +## v0.0.37 (%as) * [267fd6b](https://github.com/argoproj-labs/argo-dataflow/commit/267fd6b03450b70629c3d7b43674fb988d863a34) feat: added latency metric * [c1c88da](https://github.com/argoproj-labs/argo-dataflow/commit/c1c88da7ff9a4448f08541cdab6ddabfe61b8d88) fix: in_flight -> inflight @@ -326,7 +326,7 @@ * Alex Collins -## v0.0.36 (2021-06-05) +## v0.0.36 (%as) * [ca7edf1](https://github.com/argoproj-labs/argo-dataflow/commit/ca7edf16c34075efb77d0cc1e76c40b943ab75d5) feat: add in_flight metric * [009731a](https://github.com/argoproj-labs/argo-dataflow/commit/009731a0bab298f1d50a6a0f64278ddfba7ca2a5) refactor: made code more testable @@ -336,7 +336,7 @@ * Alex Collins -## v0.0.35 (2021-06-04) +## v0.0.35 (%as) * [421c142](https://github.com/argoproj-labs/argo-dataflow/commit/421c14269c618eb1a1c601715e6ff361d45d9921) fix: total/error calcs * [06fc5f0](https://github.com/argoproj-labs/argo-dataflow/commit/06fc5f0592ce74982864f133af5dd405948451f0) fix: only return pending for replica=0 @@ -345,7 +345,7 @@ * Alex Collins -## v0.0.34 (2021-06-04) +## v0.0.34 (%as) * [c73ce5b](https://github.com/argoproj-labs/argo-dataflow/commit/c73ce5b33a6bfc42abf772d6f794c6af885499a0) fix: change from patch to update for the controller * [4a32fb9](https://github.com/argoproj-labs/argo-dataflow/commit/4a32fb9fcd72181bb6299d7d123166e29369a8e7) fix: kafka pending messages @@ -355,7 +355,7 @@ * Alex Collins * Derek Wang -## v0.0.33 (2021-06-04) +## v0.0.33 (%as) * [60eab31](https://github.com/argoproj-labs/argo-dataflow/commit/60eab313fe856ee34b44d8f7cfcb4057d01c6344) fix: prevent violent scale-up and scale-down by only scaling by 1 each time @@ -363,7 +363,7 @@ * Alex Collins -## v0.0.32 (2021-06-04) +## v0.0.32 (%as) * [ca363fe](https://github.com/argoproj-labs/argo-dataflow/commit/ca363fe3638fa6c329dc786dedb4aaf8d230a8f3) feat: add pending metric * [f3a4813](https://github.com/argoproj-labs/argo-dataflow/commit/f3a4813aaf23d630f4d8292792d6b147d53515f0) fix: fix metrics @@ -372,7 +372,7 @@ * Alex Collins -## v0.0.31 (2021-06-04) +## v0.0.31 (%as) * [3dd39f1](https://github.com/argoproj-labs/argo-dataflow/commit/3dd39f16bf6d1104af3d2a3e4c23b98c22170639) fix(sidecar): updated to use fixed counters * [6bef420](https://github.com/argoproj-labs/argo-dataflow/commit/6bef420f18718ca977365ff2ed0c46f213036ec6) fix: removed scrape annotations @@ -382,7 +382,7 @@ * Alex Collins -## v0.0.30 (2021-06-03) +## v0.0.30 (%as) * [c6763e5](https://github.com/argoproj-labs/argo-dataflow/commit/c6763e5b1f7f8112d0e7806b2344e78b15863810) feat(runner): Request Bearer token * [9d8676b](https://github.com/argoproj-labs/argo-dataflow/commit/9d8676b5243c70ee48a211f30d1564403fa84fd7) feat(runner): Emit Prometheus metrics (#57) @@ -393,7 +393,7 @@ * Alex Collins -## v0.0.29 (2021-06-02) +## v0.0.29 (%as) * [6156b57](https://github.com/argoproj-labs/argo-dataflow/commit/6156b57f71d829df6eb8717d4a34de52a49d9511) fix: bug where we were not getting kafka secret @@ -401,24 +401,9 @@ * Alex Collins -## v0.0.28 (2021-05-27) +## v0.0.28 (%as) * [57567e3](https://github.com/argoproj-labs/argo-dataflow/commit/57567e3cfae15ebcbd2b3801a2b3db3b3c4b68e9) fix!: change rate to resource.Quantity - -### Contributors - - * Alex Collins - -## v0.0.27 (2021-05-27) - - * [98eadec](https://github.com/argoproj-labs/argo-dataflow/commit/98eadec373d57799e83377a0e048793ddec53f0a) fix!: change rate to resource.Quantity - -### Contributors - - * Alex Collins - -## v0.0.26 (2021-05-26) - * [53c4d04](https://github.com/argoproj-labs/argo-dataflow/commit/53c4d040a61e04488c717d06c3cec2c1729658e7) feat: mark all Kafka messages * [dcee8f5](https://github.com/argoproj-labs/argo-dataflow/commit/dcee8f59a2bdf3861c7475d840ec31f05bdc808e) config: remove secrets for stan/kafka @@ -426,7 +411,7 @@ * Alex Collins -## v0.0.25 (2021-05-25) +## v0.0.25 (%as) * [980c741](https://github.com/argoproj-labs/argo-dataflow/commit/980c741576235f8eb04fcecd48ef1de3c59e69d4) fix: try and avoid partition changes @@ -434,13 +419,13 @@ * Alex Collins -## v0.0.24 (2021-05-24) +## v0.0.24 (%as) ### Contributors -## v0.0.23 (2021-05-24) +## v0.0.23 (%as) * [a561027](https://github.com/argoproj-labs/argo-dataflow/commit/a5610275123a0ce82f52ca870749cde9135c238d) fix: fixed bug with pipeline conditions and messages computation * [45f51fa](https://github.com/argoproj-labs/argo-dataflow/commit/45f51fa86e36cf3a1240a73e0774a9a0f02e8149) feat: implement an ordered shutdown sequence @@ -449,7 +434,7 @@ * Alex Collins -## v0.0.22 (2021-05-22) +## v0.0.22 (%as) * [2ad2f52](https://github.com/argoproj-labs/argo-dataflow/commit/2ad2f52651cb8b4283a6cc9ef71c92c0d7c70c24) feat: allow messages to be return to request * [59669ca](https://github.com/argoproj-labs/argo-dataflow/commit/59669cabf52bd479cfae32e2bc7a74c49062a340) fix: correct http POST to use keep-alives @@ -458,7 +443,7 @@ * Alex Collins -## v0.0.21 (2021-05-21) +## v0.0.21 (%as) * [22b0b8b](https://github.com/argoproj-labs/argo-dataflow/commit/22b0b8b436ea3bef7e0f2aa17fe223e31e51eb30) fix: fix bouncy scaling bug @@ -466,7 +451,7 @@ * Alex Collins -## v0.0.20 (2021-05-21) +## v0.0.20 (%as) * [de8a19f](https://github.com/argoproj-labs/argo-dataflow/commit/de8a19fcb6960af8096ddb3bb944a06c883482b4) fix: enhanced shutdown @@ -474,7 +459,7 @@ * Alex Collins -## v0.0.19 (2021-05-20) +## v0.0.19 (%as) * [f3ba148](https://github.com/argoproj-labs/argo-dataflow/commit/f3ba1481e3cfa45e675ea7e875939b0d21346c79) fix: correct metrics over restart * [3c0b7fd](https://github.com/argoproj-labs/argo-dataflow/commit/3c0b7fd34e1828104cc8b6c4ecaa71bb21f751d4) config: pump more data thorough Kafka topic @@ -486,7 +471,7 @@ * Alex Collins -## v0.0.18 (2021-05-20) +## v0.0.18 (%as) * [9d04dcd](https://github.com/argoproj-labs/argo-dataflow/commit/9d04dcde97f04eed1f54852eb7aaf8b6f3b79e49) refactor: change from `for {}` to `wait.JitterUntil` * [e17f961](https://github.com/argoproj-labs/argo-dataflow/commit/e17f96180cb4af617303862b2a2ea212d5ddfff3) feat: only check Kafka partition for pending @@ -499,7 +484,7 @@ * Alex Collins -## v0.0.17 (2021-05-19) +## v0.0.17 (%as) * [e4a4b1a](https://github.com/argoproj-labs/argo-dataflow/commit/e4a4b1a0a523c88190f833c4c06ec7fc0dd4fde2) feat: longer status messages @@ -507,13 +492,13 @@ * Alex Collins -## v0.0.16 (2021-05-19) +## v0.0.16 (%as) ### Contributors -## v0.0.15 (2021-05-19) +## v0.0.15 (%as) * [9bef0df](https://github.com/argoproj-labs/argo-dataflow/commit/9bef0df2ec1eb398f63c9ba4b99be76b8b08aee8) feat: expose pod failure reason @@ -521,7 +506,7 @@ * Alex Collins -## v0.0.14 (2021-05-18) +## v0.0.14 (%as) * [2a23cb8](https://github.com/argoproj-labs/argo-dataflow/commit/2a23cb8abc4343272d4dd04f493d888694923114) fix: scale to 1 rather than 0 on start @@ -529,7 +514,7 @@ * Alex Collins -## v0.0.13 (2021-05-18) +## v0.0.13 (%as) * [6204cd1](https://github.com/argoproj-labs/argo-dataflow/commit/6204cd1b085b354647495ec9866b1629fc474eb4) fix: surface errors * [c000e41](https://github.com/argoproj-labs/argo-dataflow/commit/c000e4152710e9a1a1d78516e396b3892ed55e7f) fix: make minReplicas required @@ -538,7 +523,7 @@ * Alex Collins -## v0.0.12 (2021-05-18) +## v0.0.12 (%as) * [3089119](https://github.com/argoproj-labs/argo-dataflow/commit/3089119f0447727cd493b1116244d90ac6dc3a1e) fix: only terminate sidecars if the main container exit with code 0 * [6dc28a2](https://github.com/argoproj-labs/argo-dataflow/commit/6dc28a23802b922e9c074dfe93911e9ad4763012) fix: failed to record sink status @@ -548,7 +533,7 @@ * Alex Collins -## v0.0.11 (2021-05-18) +## v0.0.11 (%as) * [b22e9fd](https://github.com/argoproj-labs/argo-dataflow/commit/b22e9fd986499a6f92bb417e246f2fe8eeaed98c) fix: correct changelog order @@ -556,7 +541,7 @@ * Alex Collins -## v0.0.10 (2021-05-18) +## v0.0.10 (%as) * [dd8efe3](https://github.com/argoproj-labs/argo-dataflow/commit/dd8efe31cccc613fd15490c7ab9922fd8f1e3896) feat: and support for stateless sources and sinks with HTTP @@ -564,7 +549,7 @@ * Alex Collins -## v0.0.9 (2021-05-17) +## v0.0.9 (%as) * [940632a](https://github.com/argoproj-labs/argo-dataflow/commit/940632a82a7e43ff8eed63cb6a69949dbec85372) feat: and 1st-class support for expand and flatten * [9e6ab3d](https://github.com/argoproj-labs/argo-dataflow/commit/9e6ab3dfbd0713f443700bc9997384143a74264b) feat: support layout of cron source messages @@ -575,13 +560,13 @@ * Alex Collins -## v0.0.8 (2021-05-14) +## v0.0.8 (%as) ### Contributors -## v0.0.7 (2021-05-14) +## v0.0.7 (%as) * [7e3a574](https://github.com/argoproj-labs/argo-dataflow/commit/7e3a5747cf7c60f1bbf2012ca58736dc3552abaf) config: increase stan-default storage to 16Gi * [9166870](https://github.com/argoproj-labs/argo-dataflow/commit/91668702230ef7cf5f3a74989cd7ae1b587f3dc0) config: increase stan-default storage to 16Gi @@ -590,7 +575,7 @@ * Alex Collins -## v0.0.6 (2021-05-14) +## v0.0.6 (%as) * [8c2dd33](https://github.com/argoproj-labs/argo-dataflow/commit/8c2dd33e45125ced12a322df952fb7d40dcae066) fix(manager): re-instate killing terminated steps * [98917eb](https://github.com/argoproj-labs/argo-dataflow/commit/98917eb248100eb87d098d52d36fbd8707e4da9f) fix: report sink errors @@ -599,7 +584,7 @@ * Alex Collins -## v0.0.5 (2021-05-13) +## v0.0.5 (%as) * [1fbfbd1](https://github.com/argoproj-labs/argo-dataflow/commit/1fbfbd1fe690ddf508ca0b6aa8bedda3ed7ad7ba) fix: correct `error` to `lastError` @@ -607,7 +592,7 @@ * Alex Collins -## v0.0.4 (2021-05-13) +## v0.0.4 (%as) * [0d0c6d5](https://github.com/argoproj-labs/argo-dataflow/commit/0d0c6d56c2ff5bf7a6bab48fd2d1066885125ced) fix: remove version file * [99bfd15](https://github.com/argoproj-labs/argo-dataflow/commit/99bfd151acb810e9b9452f21a76de68d403fc081) refactor: move api/util to ../shared/ @@ -620,7 +605,7 @@ * Alex Collins -## v0.0.3 (2021-05-12) +## v0.0.3 (%as) * [f341a7d](https://github.com/argoproj-labs/argo-dataflow/commit/f341a7deec13f5962a2ddd6604a1e0c07c1cd2ac) config: change argo-server to secure * [fd75eb3](https://github.com/argoproj-labs/argo-dataflow/commit/fd75eb3763b10ca8de54316bf085b4012c74e99e) config: change argo-server to secure @@ -630,7 +615,7 @@ * Alex Collins -## v0.0.2 (2021-05-11) +## v0.0.2 (%as) ### Contributors diff --git a/api/v1alpha1/stan.go b/api/v1alpha1/stan.go index a929ad74..ebf05ec8 100644 --- a/api/v1alpha1/stan.go +++ b/api/v1alpha1/stan.go @@ -20,6 +20,10 @@ type STAN struct { Subject string `json:"subject" protobuf:"bytes,3,opt,name=subject"` SubjectPrefix SubjectPrefix `json:"subjectPrefix,omitempty" protobuf:"bytes,6,opt,name=subjectPrefix,casttype=SubjectPrefix"` Auth *STANAuth `json:"auth,omitempty" protobuf:"bytes,7,opt,name=auth"` + // Max inflight messages when subscribing to the stan server, which means how many messages + // between commits, therefore potential duplicates during disruption + // +kubebuilder:default=20 + MaxInflight uint32 `json:"maxInflight,omitempty" protobuf:"bytes,9,opt,name=maxInflight"` } type STANAuth struct { @@ -34,3 +38,10 @@ func (s *STAN) AuthStrategy() STANAuthStrategy { } return STANAuthNone } + +func (s *STAN) GetMaxInflight() int { + if s.MaxInflight < 1 { + return CommitN + } + return int(s.MaxInflight) +} diff --git a/config/ci.yaml b/config/ci.yaml index 9c1f2e04..5425a3a1 100644 --- a/config/ci.yaml +++ b/config/ci.yaml @@ -832,6 +832,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -935,6 +940,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2793,6 +2803,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2896,6 +2911,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -4370,6 +4390,7 @@ metadata: stringData: authToken: testingtokentestingtoken clusterId: stan + maxInflight: "15" natsMonitoringUrl: http://stan:8222 natsUrl: nats subjectPrefix: NamespacedPipelineName diff --git a/config/crd/bases/dataflow.argoproj.io_pipelines.yaml b/config/crd/bases/dataflow.argoproj.io_pipelines.yaml index a175ef2d..ee141d62 100644 --- a/config/crd/bases/dataflow.argoproj.io_pipelines.yaml +++ b/config/crd/bases/dataflow.argoproj.io_pipelines.yaml @@ -1220,6 +1220,14 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing + to the stan server, which means how many messages + between commits, therefore potential duplicates + during disruption + format: int32 + type: integer name: default: default type: string @@ -1332,6 +1340,14 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing + to the stan server, which means how many messages + between commits, therefore potential duplicates + during disruption + format: int32 + type: integer name: default: default type: string diff --git a/config/crd/bases/dataflow.argoproj.io_steps.yaml b/config/crd/bases/dataflow.argoproj.io_steps.yaml index 0a56abfc..9b2675ee 100644 --- a/config/crd/bases/dataflow.argoproj.io_steps.yaml +++ b/config/crd/bases/dataflow.argoproj.io_steps.yaml @@ -1176,6 +1176,13 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the + stan server, which means how many messages between commits, + therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -1286,6 +1293,13 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the + stan server, which means how many messages between commits, + therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string diff --git a/config/default.yaml b/config/default.yaml index 1951ec7f..8c1ebfe3 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -832,6 +832,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -935,6 +940,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2793,6 +2803,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2896,6 +2911,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string diff --git a/config/dev.yaml b/config/dev.yaml index baf84437..b2ee73d5 100644 --- a/config/dev.yaml +++ b/config/dev.yaml @@ -832,6 +832,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -935,6 +940,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2793,6 +2803,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2896,6 +2911,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -4370,6 +4390,7 @@ metadata: stringData: authToken: testingtokentestingtoken clusterId: stan + maxInflight: "15" natsMonitoringUrl: http://stan:8222 natsUrl: nats subjectPrefix: NamespacedPipelineName diff --git a/config/quick-start.yaml b/config/quick-start.yaml index 1951ec7f..8c1ebfe3 100644 --- a/config/quick-start.yaml +++ b/config/quick-start.yaml @@ -832,6 +832,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -935,6 +940,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2793,6 +2803,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string @@ -2896,6 +2911,11 @@ spec: type: object clusterId: type: string + maxInflight: + default: 20 + description: Max inflight messages when subscribing to the stan server, which means how many messages between commits, therefore potential duplicates during disruption + format: int32 + type: integer name: default: default type: string diff --git a/examples/dataflow-stan-default-secret.yaml b/examples/dataflow-stan-default-secret.yaml index 670c2410..b8d90ad2 100644 --- a/examples/dataflow-stan-default-secret.yaml +++ b/examples/dataflow-stan-default-secret.yaml @@ -15,3 +15,4 @@ stringData: natsMonitoringUrl: http://stan:8222 subjectPrefix: NamespacedPipelineName authToken: testingtokentestingtoken + maxInflight: "15" diff --git a/runner/sidecar/sinks.go b/runner/sidecar/sinks.go index e615defd..0a8c991c 100644 --- a/runner/sidecar/sinks.go +++ b/runner/sidecar/sinks.go @@ -152,23 +152,24 @@ func connectSTANSink(ctx context.Context, sinkName string, x *dfv1.STAN) (func(m go func() { defer runtimeutil.HandleCrash() logger.Info("starting stan auto reconnection daemon", "sink", sinkName) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { - time.Sleep(5 * time.Second) select { case <-ctx.Done(): logger.Info("exiting stan auto reconnection daemon", "sink", sinkName) return - default: - } - if conn == nil || conn.IsClosed() { - logger.Info("stan connection lost, reconnecting...", "sink", sinkName) - clientID := genClientID() - conn, err = ConnectSTAN(ctx, x, clientID) - if err != nil { - logger.Error(err, "failed to reconnect", "sink", sinkName, "clientID", clientID) - continue + case <-ticker.C: + if conn == nil || conn.IsClosed() { + logger.Info("stan connection lost, reconnecting...", "sink", sinkName) + clientID := genClientID() + conn, err = ConnectSTAN(ctx, x, clientID) + if err != nil { + logger.Error(err, "failed to reconnect", "sink", sinkName, "clientID", clientID) + continue + } + logger.Info("reconnected to stan server.", "sink", sinkName, "clientID", clientID) } - logger.Info("reconnected to stan server.", "sink", sinkName, "clientID", clientID) } } }() diff --git a/runner/sidecar/sources.go b/runner/sidecar/sources.go index 7ad97d92..d0573bae 100644 --- a/runner/sidecar/sources.go +++ b/runner/sidecar/sources.go @@ -243,7 +243,7 @@ func connectSTANSource(ctx context.Context, sourceName string, x *dfv1.STAN, f f stan.SetManualAckMode(), stan.StartAt(pb.StartPosition_NewOnly), stan.AckWait(30*time.Second), - stan.MaxInflight(dfv1.CommitN)) + stan.MaxInflight(x.GetMaxInflight())) if err != nil { return nil, fmt.Errorf("failed to subscribe: %w", err) } @@ -265,28 +265,29 @@ func connectSTANSource(ctx context.Context, sourceName string, x *dfv1.STAN, f f go func() { defer runtimeutil.HandleCrash() logger.Info("starting stan auto reconnection daemon", "source", sourceName) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { - time.Sleep(5 * time.Second) select { case <-ctx.Done(): logger.Info("exiting stan auto reconnection daemon", "source", sourceName) return - default: - } - if conn == nil || conn.IsClosed() { - _ = sub.Close() - logger.Info("stan connection lost, reconnecting...", "source", sourceName) - clientID := genClientID() - conn, err = ConnectSTAN(ctx, x, clientID) - if err != nil { - logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID) - continue - } - logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID) - if sub, err = subFunc(); err != nil { - logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID) - // Close the connection to let it retry - _ = conn.Close() + case <-ticker.C: + if conn == nil || conn.IsClosed() { + _ = sub.Close() + logger.Info("stan connection lost, reconnecting...", "source", sourceName) + clientID := genClientID() + conn, err = ConnectSTAN(ctx, x, clientID) + if err != nil { + logger.Error(err, "failed to reconnect", "source", sourceName, "clientID", clientID) + continue + } + logger.Info("reconnected to stan server.", "source", sourceName, "clientID", clientID) + if sub, err = subFunc(); err != nil { + logger.Error(err, "failed to subscribe after reconnection", "source", sourceName, "clientID", clientID) + // Close the connection to let it retry + _ = conn.Close() + } } } } diff --git a/runner/sidecar/stan.go b/runner/sidecar/stan.go index 1ad066b8..1ba5d5cd 100644 --- a/runner/sidecar/stan.go +++ b/runner/sidecar/stan.go @@ -3,6 +3,7 @@ package sidecar import ( "context" "fmt" + "strconv" dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" "github.com/nats-io/nats.go" @@ -22,11 +23,18 @@ func subjectiveStan(x *dfv1.STAN) { } } -func stanFromSecret(s *dfv1.STAN, secret *corev1.Secret) { +func stanFromSecret(s *dfv1.STAN, secret *corev1.Secret) error { s.NATSURL = dfv1.StringOr(s.NATSURL, string(secret.Data["natsUrl"])) s.NATSMonitoringURL = dfv1.StringOr(s.NATSMonitoringURL, string(secret.Data["natsMonitoringUrl"])) s.ClusterID = dfv1.StringOr(s.ClusterID, string(secret.Data["clusterId"])) s.SubjectPrefix = dfv1.SubjectPrefixOr(s.SubjectPrefix, dfv1.SubjectPrefix(secret.Data["subjectPrefix"])) + if b, ok := secret.Data["maxInflight"]; ok { + if i, err := strconv.ParseUint(string(b), 10, 32); err != nil { + return fmt.Errorf("failed to parse maxInflight: %w", err) + } else { + s.MaxInflight = uint32(i) + } + } if _, ok := secret.Data["authToken"]; ok { s.Auth = &dfv1.STANAuth{ Token: &corev1.SecretKeySelector{ @@ -37,6 +45,7 @@ func stanFromSecret(s *dfv1.STAN, secret *corev1.Secret) { }, } } + return nil } func enrichSTAN(ctx context.Context, secrets v1.SecretInterface, x *dfv1.STAN) error { @@ -46,7 +55,9 @@ func enrichSTAN(ctx context.Context, secrets v1.SecretInterface, x *dfv1.STAN) e return err } } else { - stanFromSecret(x, secret) + if err = stanFromSecret(x, secret); err != nil { + return err + } } subjectiveStan(x) return nil