Skip to content

Commit

Permalink
fix: jobsdb handle: unicode low surrogate must follow a high surrogate (
Browse files Browse the repository at this point in the history
#4762)

* fix: jobsdb handle: unicode low surrogate must follow a high surrogate

* handle sanitize json error

* add more tests
  • Loading branch information
lvrach committed Jun 6, 2024
1 parent d053245 commit 4155fa2
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 49 deletions.
92 changes: 57 additions & 35 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobsdb
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -1027,66 +1028,87 @@ func TestJobsDB_SanitizeJSON(t *testing.T) {
_ = startPostgres(t)
jobDB := Handle{config: config.New()}
ch := func(n int) string {
return strings.Repeat(string("?"), n)
return strings.Repeat("�", n)
}
toValidUTF8Tests := []struct {
in string
out string
err error
}{
{`\u0000`, ""},
{`\u0000☺\u0000b☺`, "☺b☺"},
{`\u0000`, "", nil},
{`\u0000☺\u0000b☺`, "☺b☺", nil},
// NOTE: we are not handling the following:
// {"\u0000", ""},
// {"\u0000☺\u0000b☺", "☺b☺"},

{"", ""},
{"abc", "abc"},
{"\uFDDD", "\uFDDD"},
{"a\xffb", "a" + ch(1) + "b"},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD"},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1)},
{"\xC0\xAF", ch(2)},
{"\xE0\x80\xAF", ch(3)},
{"\xed\xa0\x80", ch(3)},
{"\xed\xbf\xbf", ch(3)},
{"\xF0\x80\x80\xaf", ch(4)},
{"\xF8\x80\x80\x80\xAF", ch(5)},
{"\xFC\x80\x80\x80\x80\xAF", ch(6)},
{"", "", nil},
{"abc", "abc", nil},
{"\uFDDD", "\uFDDD", nil},
{"a\xffb", "a" + ch(1) + "b", nil},
{"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", nil},
{"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), nil},
{"\xC0\xAF", ch(2), nil},
{"\xE0\x80\xAF", ch(3), nil},
{"\xed\xa0\x80", ch(3), nil},
{"\xed\xbf\xbf", ch(3), nil},
{"\xF0\x80\x80\xaf", ch(4), nil},
{"\xF8\x80\x80\x80\xAF", ch(5), nil},
{"\xFC\x80\x80\x80\x80\xAF", ch(6), nil},

// {"\ud800", ""},
{`\ud800`, ch(1), nil},
{`\uDEAD`, ch(1), nil},

{`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil},
{`\uD83D\ude04`, "😄", nil},

{`\u4e2d\u6587`, "中文", nil},
{`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil},

{`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)},
{`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)},
}

err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5)))
require.NoError(t, err)
defer jobDB.TearDown()

eventPayload := []byte(`{"batch": [{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`)
customVal := "MOCKDS"
var jobs []*JobT
for _, tt := range toValidUTF8Tests {
jobs = append(jobs, &JobT{
for i, tt := range toValidUTF8Tests {

customVal := fmt.Sprintf("TEST_%d", i)

jobs := []*JobT{{
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1),
UserID: uuid.New().String(),
UUID: uuid.New(),
CustomVal: customVal,
WorkspaceId: defaultWorkspaceID,
EventCount: 1,
})
}
}}

err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5)))
require.NoError(t, err)
defer jobDB.TearDown()
err := jobDB.Store(context.Background(), jobs)
if tt.err != nil {
require.Error(t, err, "should error")
require.Contains(t, err.Error(), tt.err.Error(), "should contain error")
continue
}

err = jobDB.Store(context.Background(), jobs)
require.NoError(t, err)
require.NoError(t, err)

unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 100,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")
unprocessedJob, err := jobDB.GetUnprocessed(context.Background(), GetQueryParams{
CustomValFilters: []string{customVal},
JobsLimit: 10,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "should not error")

require.Len(t, unprocessedJob.Jobs, 1)

for i, tt := range toValidUTF8Tests {
require.JSONEq(t,
string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)),
string(unprocessedJob.Jobs[i].EventPayload),
string(unprocessedJob.Jobs[0].EventPayload),
)
}
}
Expand Down
64 changes: 50 additions & 14 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (

"golang.org/x/sync/errgroup"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-go-kit/bytesize"
kitutf8 "github.com/rudderlabs/rudder-go-kit/utf8"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/jobsdb/internal/cache"
Expand All @@ -58,7 +58,10 @@ import (
"github.com/lib/pq"
)

var errStaleDsList = errors.New("stale dataset list")
var (
errStaleDsList = errors.New("stale dataset list")
jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
)

const (
pgReadonlyTableExceptionFuncName = "readonly_table_exception()"
Expand Down Expand Up @@ -370,9 +373,18 @@ type ConnectionDetails struct {
DestinationID string
}

func (r *JobStatusT) sanitizeJson() {
r.ErrorResponse = sanitizeJSON(r.ErrorResponse)
r.Parameters = sanitizeJSON(r.Parameters)
func (r *JobStatusT) sanitizeJson() error {
var err error
r.ErrorResponse, err = sanitizeJSON(r.ErrorResponse)
if err != nil {
return err
}

r.Parameters, err = sanitizeJSON(r.Parameters)
if err != nil {
return err
}
return nil
}

/*
Expand All @@ -399,9 +411,17 @@ func (job *JobT) String() string {
return fmt.Sprintf("JobID=%v, UserID=%v, CreatedAt=%v, ExpireAt=%v, CustomVal=%v, Parameters=%v, EventPayload=%v EventCount=%d", job.JobID, job.UserID, job.CreatedAt, job.ExpireAt, job.CustomVal, string(job.Parameters), string(job.EventPayload), job.EventCount)
}

func (job *JobT) sanitizeJSON() {
job.EventPayload = sanitizeJSON(job.EventPayload)
job.Parameters = sanitizeJSON(job.Parameters)
func (job *JobT) sanitizeJSON() error {
var err error
job.EventPayload, err = sanitizeJSON(job.EventPayload)
if err != nil {
return err
}
job.Parameters, err = sanitizeJSON(job.Parameters)
if err != nil {
return err
}
return nil
}

// The struct fields need to be exposed to JSON package
Expand Down Expand Up @@ -1919,7 +1939,10 @@ func (jd *Handle) doStoreJobsInTx(ctx context.Context, tx *Tx, ds dataSetT, jobL
return err
}
for i := range jobList {
jobList[i].sanitizeJSON()
err = jobList[i].sanitizeJSON()
if err != nil {
return fmt.Errorf("sanitizeJSON: %w", err)
}
}
return store()
}
Expand Down Expand Up @@ -2227,7 +2250,10 @@ func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT
return
}
for i := range statusList {
statusList[i].sanitizeJson()
err = statusList[i].sanitizeJson()
if err != nil {
return
}
}
err = store()
}
Expand Down Expand Up @@ -3142,15 +3168,25 @@ func (jd *Handle) GetLastJob(ctx context.Context) *JobT {

// sanitizeJSON makes a json payload safe for writing into postgres.
// 1. Removes any \u0000 string from the payload
// 2. Replaces any invalid utf8 characters using github.com/rudderlabs/rudder-go-kit/utf8
func sanitizeJSON(input json.RawMessage) json.RawMessage {
// ~2. Replaces any invalid utf8 characters using github.com/rudderlabs/rudder-go-kit/utf8~
// 3. unmashals and marshals the payload to remove any extra keys
func sanitizeJSON(input json.RawMessage) (json.RawMessage, error) {
v := bytes.ReplaceAll(input, []byte(`\u0000`), []byte(""))
if len(v) == 0 {
v = []byte(`{}`)
}
kitutf8.Sanitize(v)

return v
var a any
err := jsonfast.Unmarshal(v, &a)
if err != nil {
return nil, err
}
v, err = jsonfast.Marshal(a)
if err != nil {
return nil, err
}

return v, nil
}

type smallDS struct {
Expand Down

0 comments on commit 4155fa2

Please sign in to comment.