|
30 | 30 | import org.elasticsearch.core.Nullable; |
31 | 31 | import org.elasticsearch.core.TimeValue; |
32 | 32 | import org.elasticsearch.index.VersionType; |
33 | | -import org.elasticsearch.index.mapper.IdFieldMapper; |
34 | | -import org.elasticsearch.index.mapper.IndexFieldMapper; |
35 | | -import org.elasticsearch.index.mapper.RoutingFieldMapper; |
36 | | -import org.elasticsearch.index.mapper.SourceFieldMapper; |
37 | | -import org.elasticsearch.index.mapper.VersionFieldMapper; |
38 | 33 | import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; |
39 | 34 | import org.elasticsearch.index.reindex.BulkByScrollResponse; |
40 | 35 | import org.elasticsearch.index.reindex.BulkByScrollTask; |
41 | 36 | import org.elasticsearch.index.reindex.ClientScrollableHitSource; |
42 | 37 | import org.elasticsearch.index.reindex.ScrollableHitSource; |
43 | 38 | import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure; |
44 | 39 | import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState; |
| 40 | +import org.elasticsearch.script.CtxMap; |
| 41 | +import org.elasticsearch.script.Metadata; |
45 | 42 | import org.elasticsearch.script.Script; |
46 | 43 | import org.elasticsearch.script.ScriptService; |
47 | 44 | import org.elasticsearch.search.Scroll; |
|
50 | 47 | import org.elasticsearch.threadpool.ThreadPool; |
51 | 48 |
|
52 | 49 | import java.util.ArrayList; |
53 | | -import java.util.Arrays; |
54 | 50 | import java.util.Collection; |
55 | 51 | import java.util.Collections; |
56 | | -import java.util.HashMap; |
57 | 52 | import java.util.HashSet; |
58 | 53 | import java.util.List; |
59 | | -import java.util.Locale; |
60 | 54 | import java.util.Map; |
61 | 55 | import java.util.Objects; |
62 | 56 | import java.util.Set; |
63 | 57 | import java.util.concurrent.ConcurrentHashMap; |
64 | 58 | import java.util.concurrent.atomic.AtomicInteger; |
65 | 59 | import java.util.concurrent.atomic.AtomicLong; |
66 | 60 | import java.util.function.BiFunction; |
| 61 | +import java.util.function.LongSupplier; |
67 | 62 |
|
68 | 63 | import static java.lang.Math.max; |
69 | 64 | import static java.lang.Math.min; |
@@ -819,147 +814,73 @@ public static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) { |
819 | 814 | /** |
820 | 815 | * Apply a {@link Script} to a {@link RequestWrapper} |
821 | 816 | */ |
822 | | - public abstract static class ScriptApplier implements BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> { |
| 817 | + public abstract static class ScriptApplier<T extends Metadata> |
| 818 | + implements |
| 819 | + BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> { |
| 820 | + |
| 821 | + // "index" is the default operation |
| 822 | + protected static final String INDEX = "index"; |
823 | 823 |
|
824 | 824 | private final WorkerBulkByScrollTaskState taskWorker; |
825 | 825 | protected final ScriptService scriptService; |
826 | 826 | protected final Script script; |
827 | 827 | protected final Map<String, Object> params; |
| 828 | + protected final LongSupplier nowInMillisSupplier; |
828 | 829 |
|
829 | 830 | public ScriptApplier( |
830 | 831 | WorkerBulkByScrollTaskState taskWorker, |
831 | 832 | ScriptService scriptService, |
832 | 833 | Script script, |
833 | | - Map<String, Object> params |
| 834 | + Map<String, Object> params, |
| 835 | + LongSupplier nowInMillisSupplier |
834 | 836 | ) { |
835 | 837 | this.taskWorker = taskWorker; |
836 | 838 | this.scriptService = scriptService; |
837 | 839 | this.script = script; |
838 | 840 | this.params = params; |
| 841 | + this.nowInMillisSupplier = nowInMillisSupplier; |
839 | 842 | } |
840 | 843 |
|
841 | 844 | @Override |
842 | | - @SuppressWarnings("unchecked") |
843 | 845 | public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hit doc) { |
844 | 846 | if (script == null) { |
845 | 847 | return request; |
846 | 848 | } |
847 | 849 |
|
848 | | - Map<String, Object> context = new HashMap<>(); |
849 | | - context.put(IndexFieldMapper.NAME, doc.getIndex()); |
850 | | - context.put(IdFieldMapper.NAME, doc.getId()); |
851 | | - Long oldVersion = doc.getVersion(); |
852 | | - context.put(VersionFieldMapper.NAME, oldVersion); |
853 | | - String oldRouting = doc.getRouting(); |
854 | | - context.put(RoutingFieldMapper.NAME, oldRouting); |
855 | | - context.put(SourceFieldMapper.NAME, request.getSource()); |
856 | | - |
857 | | - OpType oldOpType = OpType.INDEX; |
858 | | - context.put("op", oldOpType.toString()); |
| 850 | + CtxMap<T> ctxMap = execute(doc, request.getSource()); |
859 | 851 |
|
860 | | - execute(context); |
| 852 | + T metadata = ctxMap.getMetadata(); |
861 | 853 |
|
862 | | - String newOp = (String) context.remove("op"); |
863 | | - if (newOp == null) { |
864 | | - throw new IllegalArgumentException("Script cleared operation type"); |
865 | | - } |
| 854 | + request.setSource(ctxMap.getSource()); |
866 | 855 |
|
867 | | - /* |
868 | | - * It'd be lovely to only set the source if we know its been modified |
869 | | - * but it isn't worth keeping two copies of it around just to check! |
870 | | - */ |
871 | | - request.setSource((Map<String, Object>) context.remove(SourceFieldMapper.NAME)); |
| 856 | + updateRequest(request, metadata); |
872 | 857 |
|
873 | | - Object newValue = context.remove(IndexFieldMapper.NAME); |
874 | | - if (false == doc.getIndex().equals(newValue)) { |
875 | | - scriptChangedIndex(request, newValue); |
876 | | - } |
877 | | - newValue = context.remove(IdFieldMapper.NAME); |
878 | | - if (false == doc.getId().equals(newValue)) { |
879 | | - scriptChangedId(request, newValue); |
880 | | - } |
881 | | - newValue = context.remove(VersionFieldMapper.NAME); |
882 | | - if (false == Objects.equals(oldVersion, newValue)) { |
883 | | - scriptChangedVersion(request, newValue); |
884 | | - } |
885 | | - /* |
886 | | - * Its important that routing comes after parent in case you want to |
887 | | - * change them both. |
888 | | - */ |
889 | | - newValue = context.remove(RoutingFieldMapper.NAME); |
890 | | - if (false == Objects.equals(oldRouting, newValue)) { |
891 | | - scriptChangedRouting(request, newValue); |
892 | | - } |
| 858 | + return requestFromOp(request, metadata.getOp()); |
| 859 | + } |
893 | 860 |
|
894 | | - OpType newOpType = OpType.fromString(newOp); |
895 | | - if (newOpType != oldOpType) { |
896 | | - return scriptChangedOpType(request, oldOpType, newOpType); |
897 | | - } |
| 861 | + protected abstract CtxMap<T> execute(ScrollableHitSource.Hit doc, Map<String, Object> source); |
898 | 862 |
|
899 | | - if (false == context.isEmpty()) { |
900 | | - throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", context.keySet()) + ']'); |
901 | | - } |
902 | | - return request; |
903 | | - } |
| 863 | + protected abstract void updateRequest(RequestWrapper<?> request, T metadata); |
904 | 864 |
|
905 | | - protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) { |
906 | | - switch (newOpType) { |
907 | | - case NOOP -> { |
| 865 | + protected RequestWrapper<?> requestFromOp(RequestWrapper<?> request, String op) { |
| 866 | + switch (op) { |
| 867 | + case "noop" -> { |
908 | 868 | taskWorker.countNoop(); |
909 | 869 | return null; |
910 | 870 | } |
911 | | - case DELETE -> { |
| 871 | + case "delete" -> { |
912 | 872 | RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getId())); |
913 | 873 | delete.setVersion(request.getVersion()); |
914 | 874 | delete.setVersionType(VersionType.INTERNAL); |
915 | 875 | delete.setRouting(request.getRouting()); |
916 | 876 | return delete; |
917 | 877 | } |
918 | | - default -> throw new IllegalArgumentException( |
919 | | - "Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]" |
920 | | - ); |
| 878 | + case INDEX -> { |
| 879 | + return request; |
| 880 | + } |
| 881 | + default -> throw new IllegalArgumentException("Unsupported operation type change from [" + INDEX + "] to [" + op + "]"); |
921 | 882 | } |
922 | 883 | } |
923 | | - |
924 | | - protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to); |
925 | | - |
926 | | - protected abstract void scriptChangedId(RequestWrapper<?> request, Object to); |
927 | | - |
928 | | - protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to); |
929 | | - |
930 | | - protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to); |
931 | | - |
932 | | - protected abstract void execute(Map<String, Object> ctx); |
933 | | - } |
934 | | - |
935 | | - public enum OpType { |
936 | | - |
937 | | - NOOP("noop"), |
938 | | - INDEX("index"), |
939 | | - DELETE("delete"); |
940 | | - |
941 | | - private final String id; |
942 | | - |
943 | | - OpType(String id) { |
944 | | - this.id = id; |
945 | | - } |
946 | | - |
947 | | - public static OpType fromString(String opType) { |
948 | | - String lowerOpType = opType.toLowerCase(Locale.ROOT); |
949 | | - return switch (lowerOpType) { |
950 | | - case "noop" -> OpType.NOOP; |
951 | | - case "index" -> OpType.INDEX; |
952 | | - case "delete" -> OpType.DELETE; |
953 | | - default -> throw new IllegalArgumentException( |
954 | | - "Operation type [" + lowerOpType + "] not allowed, only " + Arrays.toString(values()) + " are allowed" |
955 | | - ); |
956 | | - }; |
957 | | - } |
958 | | - |
959 | | - @Override |
960 | | - public String toString() { |
961 | | - return id.toLowerCase(Locale.ROOT); |
962 | | - } |
963 | 884 | } |
964 | 885 |
|
965 | 886 | static class ScrollConsumableHitsResponse { |
|
0 commit comments