-
Notifications
You must be signed in to change notification settings - Fork 3k
AWS: Glue catalog lock interface #1823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * If set to true, it will use DynamoDB to enforce locking during commits. | ||
| */ | ||
| public static final String GLUE_CATALOG_LOCK_ENABLED = "gluecatalog.lock.enabled"; | ||
| public static final boolean GLUE_CATALOG_LOCK_ENABLED_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should default to true because it is needed for correct operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree, but after a second thought, there are a few considerations for making this false by default:
- this is an additional AWS resource that charges the user if enabled, so it is better to let people explicitly do that
- for data warehouse ETLM use cases, the current Glue concurrent modification exception is good enough as a safe guard. I think it makes more sense for user to turn this on when they know there is a potential for conflict commits in use cases like streaming.
@yyanyy and @giovannifumarola, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is important to always be correct. If we want users to need to opt in, then we can add a configuration setting that must be supplied, like the dynamo table. If that isn't supplied, then loading the catalog fails with an error message. Then we know that the user configured a specific dynamo table and understands that dynamo will be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, also I'm not an expert in AWS free tier but according to this it seems like low provisioned capacity counts towards free tier, so charging might not be a big problem in normal use cases. If this is the case, I guess this also applies to how table gets set up (billingMode).
| * If the table does not exist, it will be created at runtime. | ||
| */ | ||
| public static final String GLUE_CATALOG_LOCK_TABLE = "gluecatalog.lock.table"; | ||
| public static final String GLUE_CATALOG_LOCK_TABLE_DEFAULT = "IcebergGlueCatalogLockTable"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we base this on a prefix + catalog name instead of a constant? That way we don't do something unsafe by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this option, and decided to go with the way of setting the entire table name. This is because people can specify different catalog names and still point to the same Glue catalog, and in that case they want to use the same lock table. So setting the entire table name seems to be the most flexible option to satisfy all use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just talking about the default value. If a user wants to set the table specifically, then I agree that it should be the lock.table property to override. But the default should be based on a prefix and the catalog name to avoid needless conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined with the discussion above, I think not providing a default name seems to be the right approach to go, so that user knows exactly what table they are using, and its cost associated. Let me update with that.
| * After the given time in milliseconds, the lock is automatically expired. | ||
| */ | ||
| public static final String GLUE_CATALOG_LOCK_EXPIRE = "gluecatalog.lock.expire"; | ||
| public static final long GLUE_CATALOG_LOCK_EXPIRE_DEFAULT = TimeUnit.MINUTES.toMillis(2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commits can take longer than this. What about using the max retry time for commits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, I was planning to set it to 1200000ms which is hive.zookeeper.session.timeout, somehow typed it wrong...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking commit.retry.max-wait-ms from table properties, which is the maximum amount of time before an operation fails and will stop retrying. That way, we know that the committer that holds the lock will no longer retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean commit.retry.total-timeout-ms? commit.retry.max-wait-ms is only 60000ms which is 1 minute.
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
| * @param expireMillis max duration to hold the lock in milliseconds | ||
| * @return if lock acquisition succeeded or not | ||
| */ | ||
| boolean tryLock(String database, String table, long expireMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extend this interface a little bit? Again to help us w/ Nessie. Being able to pass at the very least the new metadata location would broaden the types of actions the implementer of this class can do.
Similar: Could we call this ConcurrencyManager or something similar? To hihglight that this is helping negotiate commit-ability rather than a pure lock mechanism?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code only locks at the table level. I think in future we can add additional methods here, e.g. based on the implementation we can have lock at the row level, folder level, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you provide more details for what is needed in Nessie? I have not read too much into the Nessie package yet (will do during thanksgiving), but one thing I think make more sense for both use cases is to expose the interface like
boolean tryLock(String lockId, long expireMillis);
And then formulate the lock id based on different needs, does that work in your case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay @jackye1995 . I've been thinking a lot about this and its not straigthtforward to do consistently. If nessie acts as the lock then acquiring the lock is effectively commiting the update to Nessie. A failure in the Glue layer would then require a rollback of nessie or the table is in an inconsistent state across Nessie/Glue. Obviously thats not acceptable, but possible. Let me think more and I can update you on Monday.
I like your suggestion re lockId. Doing a String may not be quite right as it might need excessive parsing by the impl. Nessie specifically needs at minimum TableIdentifier (or some derivation of it) and the current metadata location. I guess dynamo needs simply the TableIdentifier. So we could add the metadata location to the interface or we can collapse into a lock id object of some sort? The unlock could then take that object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I have updated the interface to take a Sting lockId. I am curious about how exactly are you thinking about integrating Nessie with Glue, so I am just leaving the interface to be as open as possible for easier integration in the future. The use of an object LockId seems a bit convoluted, since the interface was intended to be used as an internal lock mechanism for Glue catalog, but based on your description it's like Glue and Nessie doing duplicated work. Let's discuss this in slack and have another PR for the actual work if necessary.
|
Thanks a lot @jackye1995 for the awesome work! Just left a few suggestions to help w/ integration work. |
| * @param expireMillis max duration to hold the lock in milliseconds | ||
| * @return if lock acquisition succeeded or not | ||
| */ | ||
| boolean tryLock(String database, String table, long expireMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code only locks at the table level. I think in future we can add additional methods here, e.g. based on the implementation we can have lock at the row level, folder level, etc.
|
|
||
| /** | ||
| * The DynamoDB table used for locking. | ||
| * One lock table is designed to be used for only one catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not clear. We are using a single table for a single customer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it is not straightforward to get a consistent glue catalog ID, unless we always initialize a client to fetch the AWS account and region from the caller, which creates lots of complications. By doing this, you basically have one dynamo lock table for one Glue catalog, which is simpler.
| .tableName(tableName) | ||
| .keySchema(schema) | ||
| .attributeDefinitions(definitions) | ||
| .billingMode(BillingMode.PAY_PER_REQUEST) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pay per request instead of provisioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By having this, there is no cost if the table is created but not used. Users can manually change it or create the table by themselves through getLockTableSchema and getLockTableColDefinitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good thing to document.
| while (!isTableActive) { | ||
| LOG.info("waiting for DynamoDB table <{}> to be active", tableName); | ||
| try { | ||
| Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add as config? or is it a standard aws timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is one that doesn't need to be configured. It is just how long between checks, right?
That said, 5s seems a little long to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I don't think it needs to be configured. The table creation process in Dynamo typically take around 3-10 seconds, that is why I put 5 seconds.
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
| Map<String, AttributeValue> key = Maps.newHashMap(); | ||
| key.put(LOCK_TABLE_COL_TABLE_ID, AttributeValue.builder().s(tableId).build()); | ||
|
|
||
| dynamo.deleteItem(DeleteItemRequest.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add log line.
We should inform if the item does not exist in the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what log line do you mean? It does not throw exception or have any indication if it is not in the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to do a conditional delete to ensure that this does not release the lock held by another process. As long as that requires a conditional delete, it would make sense to add attribute_exists to ensure that this only succeeds if the lock exists and was still held by this process. Then a failure of either assumption will result in ConditionalCheckFailedException and this can log that the lock was no longer held.
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
| * One lock table is designed to be used for only one catalog. | ||
| * It is recommended to use a different table name for each Glue catalog. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is "One lock table is designed to be used for only one catalog" still accurate? It seems to me that we are able to have one lock table controlling multiple catalogs? And I think if we do want to recommend one table per catalog, we probably want to update the default to be prefix + catalog name to follow our own recommendation just as Ryan commented above. Although I'm not super clear on what exactly is the benefit of having one ddb table per catalog...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there will always be a confusion of Glue catalog ID versus the catalog name concept in Iceberg. The reason I don't want to do what you propose is that:
- we will be forcing users to use the same catalog name for the same Glue catalog, which might not be the case, so I don't want to make catalog name the identifier for uniqueness.
- Getting the actual Glue catalog ID is not straightforward, there is no API for it and there might be proxies, different endpoints involved pointing to different catalogs, so it is hard to get a consistent id.
I don't see any particular benefit for sharing the same dynamo table across all catalogs, since Dynamo itself is serverless and dividing them seems to have more flexibility, so I think the current design should be fine.
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
| * If set to true, it will use DynamoDB to enforce locking during commits. | ||
| */ | ||
| public static final String GLUE_CATALOG_LOCK_ENABLED = "gluecatalog.lock.enabled"; | ||
| public static final boolean GLUE_CATALOG_LOCK_ENABLED_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, also I'm not an expert in AWS free tier but according to this it seems like low provisioned capacity counts towards free tier, so charging might not be a big problem in normal use cases. If this is the case, I guess this also applies to how table gets set up (billingMode).
|
Does it still relevant, because of this |
this is still needed for Glue |
|
I'll try to take a look at this tomorrow. |
There is an unresolved issue for the |
aws/src/main/java/org/apache/iceberg/aws/glue/DefaultLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/DynamoLockManager.java
Outdated
Show resolved
Hide resolved
| * @param lockId lock ID | ||
| * @throws LockNotAcquiredException if lock is not acquired | ||
| */ | ||
| void acquire(LockId lockId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use lock and unlock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a significant reason at all, but otherwise the config key would be like lock.lock-timeout-ms which has 2 locks...
I am okay with either lock & unlock or acquire & release.
build.gradle
Outdated
| compile project(':iceberg-api') | ||
| compile project(':iceberg-core') | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace change.
aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java
Outdated
Show resolved
Hide resolved
| return loadLockManager(properties.get(CatalogProperties.LOCK_IMPL), properties); | ||
| } else { | ||
| return defaultLockManager(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
|
|
||
| Map<String, AttributeValue> expressionValues = Maps.newHashMap(); | ||
| expressionValues.put(":eid", AttributeValue.builder().s(lockId.entityId()).build()); | ||
| expressionValues.put(":vid", AttributeValue.builder().s(currentItem.get(COL_VERSION).s()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also store the interval in the row? That way, the process that holds the lock shares its expectation with other processes. If you don't do that, then another process could use a wait interval shorter than the lock holder's heartbeat interval and incorrectly unlock. So the expected wait interval should come from the process that currently holds the lock, not the process attempting to get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the situation you describe would happen. The heartbeat timeout duration is already stored (L270), and the lock acquisition process would wait for the stored heartbeat timeout duration before trying to acquire (L211).
This means the total acquisition time might exceed lock.acquire-timeout-ms, because it will first wait for the heartbeat timeout duration and then check if acquisition time has timed out or not. And the lock.acquire-interval-ms can be very very short, and this lock mechanism would still work.
Maybe you are looking at the wrong line. The expressionValues here are only used to check against CONDITION_LOCK_ENTITY_NOT_EXIST_OR_VERSION_MATCH, the actual data stored is at toNewItem at L270.
aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java
Outdated
Show resolved
Hide resolved
| checkMetadataLocation(glueTable, base); | ||
| Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation); | ||
| try { | ||
| lock(newMetadataLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this locks in the try block, the finally will be called when the lock fails. I think that's correct for cleaning up the metadata location, but the lock release will currently fail because the lock isn't held by this thread. That will cause the lock failure exception to get replaced by the unlock failure.
I think the solution is to not throw exceptions in release. I'll comment on that below.
| public void release(String entityId, String ownerId) { | ||
| DefaultLockContent currentContent = LOCKS.get(entityId); | ||
| if (currentContent == null) { | ||
| throw new IllegalArgumentException("Cannot find lock for entity " + entityId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IllegalArgumentException isn't quite right. The argument is correct, it is the lock state that is the problem. IllegalStateException is a better one to throw for this and the next case.
We also need to consider what this exception would cause if thrown. The release method is currently called in a finally block so if it throws an exception, it will replace the real failure. I noted that above where if the lock can't be acquired, either this or the next check's exception will be thrown.
In addition to that problem, a failure here could cause duplicate data. If the table state is updated, then success must be reported back. If not, then it is very likely that the operation will retry at some level and there is no guarantee that the operation is idempotent. For example, a writer moving data from Kafka to an Iceberg table will append data. If a checkpoint commit fails, it will retry. If the failure happened after the commit actually succeeded, then the append will be done twice causing duplicate data in the table.
Most of the time, a failure like that would cause the metadata file that was to be committed to be deleted, which ends up locking up the table. But this unlock happens after the delete would happen, so the table would appear correct but the operation would probably retry.
I think the solution to these problems is to make this log the error and return a boolean. If the lock was held and released, return true. If it was not held, then return false. That way the caller can decide how to handle it. And we should log the error messages you have here for context.
| static class InMemoryLockManager extends BaseLockManager { | ||
|
|
||
| private static final Map<String, DefaultLockContent> LOCKS = Maps.newConcurrentMap(); | ||
| private static final Map<String, ScheduledFuture<?>> HEARTBEATS = Maps.newHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with including this in the implementation, but it doesn't seem to me that there are that many cases where an internal heartbeat helps. If the thread that owns the lock fails, the heartbeat thread will continue to live. So the heartbeat is really a process heartbeat. And because the locks are all process-local anyway there isn't much value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is more for illustration purpose.
aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java
Outdated
Show resolved
Hide resolved
aws/src/main/java/org/apache/iceberg/aws/glue/LockManagers.java
Outdated
Show resolved
Hide resolved
aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java
Show resolved
Hide resolved
aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java
Outdated
Show resolved
Hide resolved
| public void before() { | ||
| lockEntityId = UUID.randomUUID().toString(); | ||
| ownerId = UUID.randomUUID().toString(); | ||
| lockManager = new LockManagers.InMemoryLockManager(Maps.newHashMap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How long does this take to run? It seems like a good idea to use some settings to avoid long runtimes. What about setting the timeout to 1s or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a timeout rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant configuring the lock manager timeouts to small increments, not adding a Timeout. Since you adjust settings in individual tests, it should be fine.
aws/src/test/java/org/apache/iceberg/aws/glue/InMemoryLockManagerTest.java
Outdated
Show resolved
Hide resolved
| * Release a lock | ||
| * @param entityId ID of the entity to lock | ||
| * @param ownerId ID of the owner if the lock | ||
| * @return if the lock for the entity of the owner is released |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't quite correct. It should return true if the lock was held and released, and false otherwise. This should also note that the contract requires not throwing exceptions from this method.
Allow Glue catalog to use an external DynamoDB table to enforce serializable isolation during commit.