Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

public class CatalogHandlers {
private static final Schema EMPTY_SCHEMA = new Schema();
private static final String INTIAL_PAGE_TOKEN = "";

private CatalogHandlers() {}

Expand Down Expand Up @@ -117,6 +118,29 @@ public static ListNamespacesResponse listNamespaces(
return ListNamespacesResponse.builder().addAll(results).build();
}

public static ListNamespacesResponse listNamespaces(
SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) {
List<Namespace> results;
List<Namespace> subResults;

if (parent.isEmpty()) {
results = catalog.listNamespaces();
} else {
results = catalog.listNamespaces(parent);
}

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListNamespacesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static CreateNamespaceResponse createNamespace(
SupportsNamespaces catalog, CreateNamespaceRequest request) {
Namespace namespace = request.namespace();
Expand Down Expand Up @@ -174,6 +198,23 @@ public static ListTablesResponse listTables(Catalog catalog, Namespace namespace
return ListTablesResponse.builder().addAll(idents).build();
}

public static ListTablesResponse listTables(
Catalog catalog, Namespace namespace, String pageToken, String pageSize) {
List<TableIdentifier> results = catalog.listTables(namespace);
List<TableIdentifier> subResults;

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListTablesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static LoadTableResponse stageTableCreate(
Catalog catalog, Namespace namespace, CreateTableRequest request) {
request.validate();
Expand Down Expand Up @@ -397,6 +438,23 @@ public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namesp
return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build();
}

public static ListTablesResponse listViews(
ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) {
List<TableIdentifier> results = catalog.listViews(namespace);
List<TableIdentifier> subResults;

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListTablesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static LoadViewResponse createView(
ViewCatalog catalog, Namespace namespace, CreateViewRequest request) {
request.validate();
Expand Down
99 changes: 71 additions & 28 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
public static final String REST_PAGE_SIZE = "rest-page-size";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
Expand All @@ -136,6 +137,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private FileIO io = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private Integer pageSize = null;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
Expand Down Expand Up @@ -228,6 +230,12 @@ public void initialize(String name, Map<String, String> unresolved) {
client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth);
}

this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE);
if (pageSize != null) {
Preconditions.checkArgument(
pageSize > 0, "Invalid value for %s, must be a positive integer", REST_PAGE_SIZE);
}

this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
Expand Down Expand Up @@ -278,14 +286,27 @@ public void setConf(Object newConf) {
@Override
public List<TableIdentifier> listTables(SessionContext context, Namespace ns) {
checkNamespaceIsValid(ns);
Map<String, String> queryParams = Maps.newHashMap();
ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

ListTablesResponse response =
client.get(
paths.tables(ns),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
do {
queryParams.put("pageToken", pageToken);
ListTablesResponse response =
client.get(
paths.tables(ns),
queryParams,
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
tables.addAll(response.identifiers());
} while (pageToken != null);

return tables.build();
}

@Override
Expand Down Expand Up @@ -494,22 +515,31 @@ public void createNamespace(

@Override
public List<Namespace> listNamespaces(SessionContext context, Namespace namespace) {
Map<String, String> queryParams;
if (namespace.isEmpty()) {
queryParams = ImmutableMap.of();
} else {
// query params should be unescaped
queryParams = ImmutableMap.of("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
Map<String, String> queryParams = Maps.newHashMap();
if (!namespace.isEmpty()) {
queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}

ListNamespacesResponse response =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.namespaces();
ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

do {
queryParams.put("pageToken", pageToken);
ListNamespacesResponse response =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
namespaces.addAll(response.namespaces());
} while (pageToken != null);

return namespaces.build();
}

@Override
Expand Down Expand Up @@ -1048,14 +1078,27 @@ public void commitTransaction(SessionContext context, List<TableCommit> commits)
@Override
public List<TableIdentifier> listViews(SessionContext context, Namespace namespace) {
checkNamespaceIsValid(namespace);
Map<String, String> queryParams = Maps.newHashMap();
ImmutableList.Builder<TableIdentifier> views = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

ListTablesResponse response =
client.get(
paths.views(namespace),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
do {
queryParams.put("pageToken", pageToken);
ListTablesResponse response =
client.get(
paths.views(namespace),
queryParams,
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
views.addAll(response.identifiers());
} while (pageToken != null);

return views.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
public class ListNamespacesResponse implements RESTResponse {

private List<Namespace> namespaces;
private String nextPageToken;

public ListNamespacesResponse() {
// Required for Jackson deserialization
}

private ListNamespacesResponse(List<Namespace> namespaces) {
private ListNamespacesResponse(List<Namespace> namespaces, String nextPageToken) {
this.namespaces = namespaces;
this.nextPageToken = nextPageToken;
validate();
}

Expand All @@ -48,9 +50,16 @@ public List<Namespace> namespaces() {
return namespaces != null ? namespaces : ImmutableList.of();
}

public String nextPageToken() {
return nextPageToken;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("namespaces", namespaces()).toString();
return MoreObjects.toStringHelper(this)
.add("namespaces", namespaces())
.add("next-page-token", nextPageToken())
.toString();
}

public static Builder builder() {
Expand All @@ -59,6 +68,7 @@ public static Builder builder() {

public static class Builder {
private final ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
private String nextPageToken;

private Builder() {}

Expand All @@ -75,8 +85,13 @@ public Builder addAll(Collection<Namespace> toAdd) {
return this;
}

public Builder nextPageToken(String pageToken) {
nextPageToken = pageToken;
return this;
}

public ListNamespacesResponse build() {
return new ListNamespacesResponse(namespaces.build());
return new ListNamespacesResponse(namespaces.build(), nextPageToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
public class ListTablesResponse implements RESTResponse {

private List<TableIdentifier> identifiers;
private String nextPageToken;

public ListTablesResponse() {
// Required for Jackson deserialization
}

private ListTablesResponse(List<TableIdentifier> identifiers) {
private ListTablesResponse(List<TableIdentifier> identifiers, String nextPageToken) {
this.identifiers = identifiers;
this.nextPageToken = nextPageToken;
validate();
}

Expand All @@ -49,9 +51,16 @@ public List<TableIdentifier> identifiers() {
return identifiers != null ? identifiers : ImmutableList.of();
}

public String nextPageToken() {
return nextPageToken;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("identifiers", identifiers).toString();
return MoreObjects.toStringHelper(this)
.add("identifiers", identifiers)
.add("next-page-token", nextPageToken())
.toString();
}

public static Builder builder() {
Expand All @@ -60,6 +69,7 @@ public static Builder builder() {

public static class Builder {
private final ImmutableList.Builder<TableIdentifier> identifiers = ImmutableList.builder();
private String nextPageToken;

private Builder() {}

Expand All @@ -76,8 +86,13 @@ public Builder addAll(Collection<TableIdentifier> toAdd) {
return this;
}

public Builder nextPageToken(String pageToken) {
nextPageToken = pageToken;
return this;
}

public ListTablesResponse build() {
return new ListTablesResponse(identifiers.build());
return new ListTablesResponse(identifiers.build(), nextPageToken);
}
}
}
32 changes: 29 additions & 3 deletions core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,17 @@ public <T extends RESTResponse> T handleRequest(
ns = Namespace.empty();
}

return castResponse(responseType, CatalogHandlers.listNamespaces(asNamespaceCatalog, ns));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);

if (pageSize != null) {
return castResponse(
responseType,
CatalogHandlers.listNamespaces(asNamespaceCatalog, ns, pageToken, pageSize));
} else {
return castResponse(
responseType, CatalogHandlers.listNamespaces(asNamespaceCatalog, ns));
}
}
break;

Expand Down Expand Up @@ -339,7 +349,14 @@ public <T extends RESTResponse> T handleRequest(
case LIST_TABLES:
{
Namespace namespace = namespaceFromPathVars(vars);
return castResponse(responseType, CatalogHandlers.listTables(catalog, namespace));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);
if (pageSize != null) {
return castResponse(
responseType, CatalogHandlers.listTables(catalog, namespace, pageToken, pageSize));
} else {
return castResponse(responseType, CatalogHandlers.listTables(catalog, namespace));
}
}

case CREATE_TABLE:
Expand Down Expand Up @@ -412,7 +429,16 @@ public <T extends RESTResponse> T handleRequest(
{
if (null != asViewCatalog) {
Namespace namespace = namespaceFromPathVars(vars);
return castResponse(responseType, CatalogHandlers.listViews(asViewCatalog, namespace));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);
if (pageSize != null) {
return castResponse(
responseType,
CatalogHandlers.listViews(asViewCatalog, namespace, pageToken, pageSize));
} else {
return castResponse(
responseType, CatalogHandlers.listViews(asViewCatalog, namespace));
}
}
break;
}
Expand Down
Loading