Skip to content

Commit

Permalink
refactor: use announcement response schema
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Feb 20, 2024
1 parent 7f2f2ee commit 45d50f5
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 47 deletions.
16 changes: 8 additions & 8 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *Client) FindProviders(ctx context.Context, key cid.Cid) (providers iter
// Provide publishes [types.AnnouncementRecord]s based on the given [types.AnnouncementRequests].
// This records will be signed by your provided. Therefore, the [Client] must have been configured
// with [WithProviderInfo].
func (c *Client) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (c *Client) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func (c *Client) Provide(ctx context.Context, announcements ...types.Announcemen

// ProvideRecords publishes the given [types.AnnouncementRecord]. An error will
// be returned if the records aren't signed or valid.
func (c *Client) ProvideRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (c *Client) ProvideRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
providerRecords := make([]types.Record, len(records))
for i, record := range records {
if err := record.Verify(); err != nil {
Expand All @@ -325,7 +325,7 @@ func (c *Client) ProvideRecords(ctx context.Context, records ...*types.Announcem
return c.provide(ctx, url, req)

Check warning on line 325 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L321-L325

Added lines #L321 - L325 were not covered by tests
}

func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
b, err := drjson.MarshalJSONBytes(req)
if err != nil {
return nil, err

Check warning on line 331 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L331

Added line #L331 was not covered by tests
Expand Down Expand Up @@ -360,19 +360,19 @@ func (c *Client) provide(ctx context.Context, url string, req interface{}) (iter
}
}()

var it iter.ResultIter[*types.AnnouncementRecord]
var it iter.ResultIter[*types.AnnouncementResponseRecord]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.AnnouncePeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
if err != nil {
return nil, err
}

Check warning on line 370 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L369-L370

Added lines #L369 - L370 were not covered by tests
var sliceIt iter.Iter[*types.AnnouncementRecord] = iter.FromSlice(parsedResp.ProvideResults)
var sliceIt iter.Iter[*types.AnnouncementResponseRecord] = iter.FromSlice(parsedResp.ProvideResults)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewAnnouncementRecordsIter(resp.Body)
it = ndjson.NewAnnouncementResponseRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")

Check warning on line 378 in routing/http/client/client.go

View check run for this annotation

Codecov / codecov/patch

routing/http/client/client.go#L373-L378

Added lines #L373 - L378 were not covered by tests
Expand Down Expand Up @@ -470,7 +470,7 @@ func (c *Client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultI

// ProvidePeer publishes an [types.AnnouncementRecord] with the provider
// information from your peer, configured with [WithProviderInfo].
func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []byte) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []byte) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
if err := c.canProvide(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -513,7 +513,7 @@ func (c *Client) ProvidePeer(ctx context.Context, ttl time.Duration, metadata []

// ProvidePeerRecords publishes the given [types.AnnouncementRecord]. An error will
// be returned if the records aren't signed or valid.
func (c *Client) ProvidePeerRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (c *Client) ProvidePeerRecords(ctx context.Context, records ...*types.AnnouncementRecord) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
providerRecords := make([]types.Record, len(records))
for i, record := range records {
if err := record.Verify(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func TestClient_Provide(t *testing.T) {
require.Empty(t, results[0].Error)
}

assert.Equal(t, c.expAdvisoryTTL, results[0].Payload.TTL)
assert.Equal(t, c.expAdvisoryTTL, results[0].TTL)
})
}
}
Expand Down Expand Up @@ -753,7 +753,7 @@ func TestClient_ProvidePeer(t *testing.T) {
require.Empty(t, results[0].Error)
}

assert.Equal(t, c.expAdvisoryTTL, results[0].Payload.TTL)
assert.Equal(t, c.expAdvisoryTTL, results[0].TTL)
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const ttl = 24 * time.Hour

type Client interface {
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementRecord], error)
Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error)
FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[*types.PeerRecord], err error)
GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error)
PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error
Expand Down
16 changes: 8 additions & 8 deletions routing/http/contentrouter/contentrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.Resul
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}

func (m *mockClient) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementRecord], error) {
func (m *mockClient) Provide(ctx context.Context, announcements ...types.AnnouncementRequest) (iter.ResultIter[*types.AnnouncementResponseRecord], error) {
args := m.Called(ctx, announcements)
return args.Get(0).(iter.ResultIter[*types.AnnouncementRecord]), args.Error(1)
return args.Get(0).(iter.ResultIter[*types.AnnouncementResponseRecord]), args.Error(1)
}

func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[*types.PeerRecord], error) {
Expand Down Expand Up @@ -76,10 +76,10 @@ func TestProvide(t *testing.T) {
crc := NewContentRoutingClient(client)

if !c.expNotProvided {
res := []*types.AnnouncementRecord{
{Payload: types.AnnouncementPayload{TTL: time.Minute}},
res := []*types.AnnouncementResponseRecord{
{TTL: time.Minute},
}
client.On("Provide", ctx, []types.AnnouncementRequest{{CID: key, TTL: ttl}}).Return(iter.ToResultIter[*types.AnnouncementRecord](iter.FromSlice(res)), nil)
client.On("Provide", ctx, []types.AnnouncementRequest{{CID: key, TTL: ttl}}).Return(iter.ToResultIter[*types.AnnouncementResponseRecord](iter.FromSlice(res)), nil)
}

err := crc.Provide(ctx, key, c.announce)
Expand All @@ -101,10 +101,10 @@ func TestProvideMany(t *testing.T) {
ctx := context.Background()
client := &mockClient{}
crc := NewContentRoutingClient(client)
res := []*types.AnnouncementRecord{
{Payload: types.AnnouncementPayload{TTL: time.Minute}},
res := []*types.AnnouncementResponseRecord{
{TTL: time.Minute},
}
client.On("Provide", ctx, makeBatchAnnouncements(cids, ttl)).Return(iter.ToResultIter[*types.AnnouncementRecord](iter.FromSlice(res)), nil)
client.On("Provide", ctx, makeBatchAnnouncements(cids, ttl)).Return(iter.ToResultIter[*types.AnnouncementResponseRecord](iter.FromSlice(res)), nil)
err := crc.ProvideMany(ctx, mhs)
require.NoError(t, err)
}
Expand Down
27 changes: 12 additions & 15 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ func (s *server) providePeers(w http.ResponseWriter, r *http.Request) {
return
}

Check warning on line 282 in routing/http/server/server.go

View check run for this annotation

Codecov / codecov/patch

routing/http/server/server.go#L280-L282

Added lines #L280 - L282 were not covered by tests

responseIter := iter.Map[types.Record, *types.AnnouncementRecord](iter.FromSlice(req.Peers), func(t types.Record) *types.AnnouncementRecord {
resRecord := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
responseIter := iter.Map[types.Record, *types.AnnouncementResponseRecord](iter.FromSlice(req.Peers), func(t types.Record) *types.AnnouncementResponseRecord {
resRecord := &types.AnnouncementResponseRecord{
Schema: types.SchemaAnnouncementResponse,
}

reqRecord, err := s.provideCheckAnnouncement("Provide", t)
Expand All @@ -298,8 +298,7 @@ func (s *server) providePeers(w http.ResponseWriter, r *http.Request) {
return resRecord
}

Check warning on line 299 in routing/http/server/server.go

View check run for this annotation

Codecov / codecov/patch

routing/http/server/server.go#L297-L299

Added lines #L297 - L299 were not covered by tests

resRecord.Payload.TTL = ttl
resRecord.Payload.ID = reqRecord.Payload.ID
resRecord.TTL = ttl
return resRecord
})

Expand All @@ -310,10 +309,10 @@ func (s *server) providePeers(w http.ResponseWriter, r *http.Request) {
}

Check warning on line 309 in routing/http/server/server.go

View check run for this annotation

Codecov / codecov/patch

routing/http/server/server.go#L307-L309

Added lines #L307 - L309 were not covered by tests

if mediaType == mediaTypeNDJSON {
writeResultsIterNDJSON[*types.AnnouncementRecord](w, iter.ToResultIter[*types.AnnouncementRecord](responseIter))
writeResultsIterNDJSON[*types.AnnouncementResponseRecord](w, iter.ToResultIter[*types.AnnouncementResponseRecord](responseIter))
} else {
writeJSONResult(w, "ProvidePeers", jsontypes.AnnouncePeersResponse{
ProvideResults: iter.ReadAll[*types.AnnouncementRecord](responseIter),
ProvideResults: iter.ReadAll[*types.AnnouncementResponseRecord](responseIter),
})
}
}
Expand All @@ -327,9 +326,9 @@ func (s *server) provide(w http.ResponseWriter, r *http.Request) {
return
}

responseIter := iter.Map[types.Record, *types.AnnouncementRecord](iter.FromSlice(req.Providers), func(t types.Record) *types.AnnouncementRecord {
resRecord := &types.AnnouncementRecord{
Schema: types.SchemaAnnouncement,
responseIter := iter.Map[types.Record, *types.AnnouncementResponseRecord](iter.FromSlice(req.Providers), func(t types.Record) *types.AnnouncementResponseRecord {
resRecord := &types.AnnouncementResponseRecord{
Schema: types.SchemaAnnouncementResponse,
}

reqRecord, err := s.provideCheckAnnouncement("Provide", t)
Expand All @@ -344,9 +343,7 @@ func (s *server) provide(w http.ResponseWriter, r *http.Request) {
return resRecord

Check warning on line 343 in routing/http/server/server.go

View check run for this annotation

Codecov / codecov/patch

routing/http/server/server.go#L342-L343

Added lines #L342 - L343 were not covered by tests
}

resRecord.Payload.TTL = ttl
resRecord.Payload.CID = reqRecord.Payload.CID
resRecord.Payload.ID = reqRecord.Payload.ID
resRecord.TTL = ttl
return resRecord
})

Expand All @@ -357,10 +354,10 @@ func (s *server) provide(w http.ResponseWriter, r *http.Request) {
}

Check warning on line 354 in routing/http/server/server.go

View check run for this annotation

Codecov / codecov/patch

routing/http/server/server.go#L352-L354

Added lines #L352 - L354 were not covered by tests

if mediaType == mediaTypeNDJSON {
writeResultsIterNDJSON[*types.AnnouncementRecord](w, iter.ToResultIter[*types.AnnouncementRecord](responseIter))
writeResultsIterNDJSON[*types.AnnouncementResponseRecord](w, iter.ToResultIter[*types.AnnouncementResponseRecord](responseIter))
} else {
writeJSONResult(w, "Provide", jsontypes.AnnounceProvidersResponse{
ProvideResults: iter.ReadAll[*types.AnnouncementRecord](responseIter),
ProvideResults: iter.ReadAll[*types.AnnouncementResponseRecord](responseIter),
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions routing/http/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ func TestProviders(t *testing.T) {
}

t.Run("POST /routing/v1/providers (JSON Response)", func(t *testing.T) {
runPutTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement","Payload":{"CID":"`+cid1Str+`","ID":"`+pid1Str+`","TTL":3600000}},{"Schema":"announcement","Payload":{"CID":"`+cid1Str+`","ID":"`+pid2Str+`","TTL":60000}}]}`)
runPutTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement-response","TTL":3600000},{"Schema":"announcement-response","TTL":60000}]}`)
})

t.Run("POST /routing/v1/providers (NDJSON Response)", func(t *testing.T) {
runPutTest(t, mediaTypeNDJSON, `{"Schema":"announcement","Payload":{"CID":"`+cid1Str+`","ID":"`+pid1Str+`","TTL":3600000}}`+"\n"+`{"Schema":"announcement","Payload":{"CID":"`+cid1Str+`","ID":"`+pid2Str+`","TTL":60000}}`+"\n")
runPutTest(t, mediaTypeNDJSON, `{"Schema":"announcement-response","TTL":3600000}`+"\n"+`{"Schema":"announcement-response","TTL":60000}`+"\n")
})
}

Expand Down Expand Up @@ -376,11 +376,11 @@ func TestPeers(t *testing.T) {
}

t.Run("POST /routing/v1/peers (JSON Response)", func(t *testing.T) {
runPutTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement","Payload":{"ID":"`+pid1.String()+`","TTL":3600000}},{"Schema":"announcement","Payload":{"ID":"`+pid2.String()+`","TTL":60000}}]}`)
runPutTest(t, mediaTypeJSON, `{"ProvideResults":[{"Schema":"announcement-response","TTL":3600000},{"Schema":"announcement-response","TTL":60000}]}`)
})

t.Run("POST /routing/v1/peers (NDJSON Response)", func(t *testing.T) {
runPutTest(t, mediaTypeNDJSON, `{"Schema":"announcement","Payload":{"ID":"`+pid1.String()+`","TTL":3600000}}`+"\n"+`{"Schema":"announcement","Payload":{"ID":"`+pid2.String()+`","TTL":60000}}`+"\n")
runPutTest(t, mediaTypeNDJSON, `{"Schema":"announcement-response","TTL":3600000}`+"\n"+`{"Schema":"announcement-response","TTL":60000}`+"\n")
})
}

Expand Down
9 changes: 8 additions & 1 deletion routing/http/types/json/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error {
return err
}

Check warning on line 49 in routing/http/types/json/responses.go

View check run for this annotation

Codecov / codecov/patch

routing/http/types/json/responses.go#L48-L49

Added lines #L48 - L49 were not covered by tests
*r = append(*r, &prov)
case types.SchemaAnnouncementResponse:
var prov types.AnnouncementResponseRecord

Check warning on line 52 in routing/http/types/json/responses.go

View check run for this annotation

Codecov / codecov/patch

routing/http/types/json/responses.go#L51-L52

Added lines #L51 - L52 were not covered by tests
err := json.Unmarshal(provBytes, &prov)
if err != nil {
return err
}
*r = append(*r, &prov)
default:
*r = append(*r, &readProv)
}
Expand All @@ -58,7 +65,7 @@ func (r *RecordsArray) UnmarshalJSON(b []byte) error {

// AnnounceProvidersResponse is the result of a POST Providers request.
type AnnounceProvidersResponse struct {
ProvideResults []*types.AnnouncementRecord
ProvideResults []*types.AnnouncementResponseRecord
}

// AnnouncePeersResponse is the result of a POST Peers request.
Expand Down
10 changes: 5 additions & 5 deletions routing/http/types/ndjson/records.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] {
return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn)
}

// NewAnnouncementRecordsIter returns an iterator that reads [types.AnnouncementRecord]
// from the given [io.Reader]. Records with a different schema are ignored. To read all
// records, use [NewRecordsIter] instead.
func NewAnnouncementRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.AnnouncementRecord]] {
return newFilteredRecords[*types.AnnouncementRecord](r, types.SchemaPeer)
// NewAnnouncementResponseRecordsIter returns an iterator that reads
// [types.AnnouncementResponseRecord] from the given [io.Reader]. Records with
// a different schema are ignored. To read all records, use [NewRecordsIter] instead.
func NewAnnouncementResponseRecordsIter(r io.Reader) iter.Iter[iter.Result[*types.AnnouncementResponseRecord]] {
return newFilteredRecords[*types.AnnouncementResponseRecord](r, types.SchemaPeer)

Check warning on line 51 in routing/http/types/ndjson/records.go

View check run for this annotation

Codecov / codecov/patch

routing/http/types/ndjson/records.go#L50-L51

Added lines #L50 - L51 were not covered by tests
}

// NewPeerRecordsIter returns an iterator that reads [types.PeerRecord] from the given
Expand Down
53 changes: 50 additions & 3 deletions routing/http/types/record_announcement.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ import (
"github.com/multiformats/go-multibase"
)

const SchemaAnnouncement = "announcement"
const announcementSignaturePrefix = "routing-record:"
const (
SchemaAnnouncement = "announcement"
SchemaAnnouncementResponse = "announcement-response"

announcementSignaturePrefix = "routing-record:"
)

var _ Record = &AnnouncementRecord{}

Expand Down Expand Up @@ -195,7 +199,6 @@ func (ap *AnnouncementPayload) UnmarshalJSON(b []byte) error {
// AnnouncementRecord is a [Record] of [SchemaAnnouncement].
type AnnouncementRecord struct {
Schema string
Error string `json:",omitempty"`
Payload AnnouncementPayload
Signature string `json:",omitempty"`
}
Expand Down Expand Up @@ -327,3 +330,47 @@ func makeIPLDMap(mp map[string]ipld.Node) (datamodel.Node, error) {

return nd.Build(), nil
}

var _ Record = &AnnouncementResponseRecord{}

// AnnouncementRecord is a [Record] of [SchemaAnnouncementResponse].
type AnnouncementResponseRecord struct {
Schema string
Error string
TTL time.Duration
}

func (ar *AnnouncementResponseRecord) GetSchema() string {
return ar.Schema

Check warning on line 344 in routing/http/types/record_announcement.go

View check run for this annotation

Codecov / codecov/patch

routing/http/types/record_announcement.go#L343-L344

Added lines #L343 - L344 were not covered by tests
}

func (ar AnnouncementResponseRecord) MarshalJSON() ([]byte, error) {
v := struct {
Schema string
Error string `json:",omitempty"`
TTL int64 `json:",omitempty"`
}{
Schema: ar.Schema,
Error: ar.Error,
TTL: ar.TTL.Milliseconds(),
}

return drjson.MarshalJSONBytes(v)
}

func (ar *AnnouncementResponseRecord) UnmarshalJSON(b []byte) error {
v := struct {
Schema string
Error string
TTL int64
}{}
err := json.Unmarshal(b, &v)
if err != nil {
return err
}

Check warning on line 370 in routing/http/types/record_announcement.go

View check run for this annotation

Codecov / codecov/patch

routing/http/types/record_announcement.go#L369-L370

Added lines #L369 - L370 were not covered by tests

ar.Schema = v.Schema
ar.Error = v.Error
ar.TTL = time.Duration(v.TTL) * time.Millisecond
return nil
}

0 comments on commit 45d50f5

Please sign in to comment.