Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

chore: manually provide ip addresses to index provider #382

Merged
merged 4 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ keystore_path = "~/.ursa/keystore"
identity = "default"

[provider_config]
domain = "example.domain"
# Public IP address of the node
addresses = ["/ip4/127.0.0.1/tcp/4069"]
indexer_url = "https://dev.cid.contact"
database_path = "~/.ursa/data/index_provider_db"

Expand Down
2 changes: 1 addition & 1 deletion crates/ursa-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Default for GatewayConfig {
stream_buf: 2_000_000, // 2MB
},
indexer: IndexerConfig {
cid_url: "https://cid.contact/cid".into(),
cid_url: "https://dev.cid.contact/cid".into(),
},
}
}
Expand Down
48 changes: 12 additions & 36 deletions crates/ursa-index-provider/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use axum::{body::Body, extract::Path, response::Response, routing::get, Extensio
use crate::provider::ProviderError;
use fvm_ipld_blockstore::Blockstore;
use libipld::Cid;
use libp2p::{gossipsub::TopicHash, identity::Keypair, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p::{gossipsub::TopicHash, identity::Keypair, Multiaddr, PeerId};
use std::{collections::VecDeque, str::FromStr, sync::Arc};
use tracing::{error, info, warn};
use ursa_store::UrsaStore;
Expand Down Expand Up @@ -91,9 +91,8 @@ pub struct ProviderEngine<S> {
command_receiver: Receiver<ProviderCommand>,
/// network command sender for communication with libp2p node
network_command_sender: Sender<NetworkCommand>,
/// Server from which advertised content is retrievable.
server_address: Multiaddr,
domain: Multiaddr,
/// List of addresses to submit to indexer.
addresses: Vec<Multiaddr>,
}

impl<S> ProviderEngine<S>
Expand All @@ -106,8 +105,7 @@ where
provider_store: Arc<UrsaStore<S>>,
config: ProviderConfig,
network_command_sender: Sender<NetworkCommand>,
server_address: Multiaddr,
domain: Multiaddr,
addresses: Vec<Multiaddr>,
) -> Self {
let (command_sender, command_receiver) = unbounded_channel();
ProviderEngine {
Expand All @@ -117,8 +115,7 @@ where
network_command_sender,
provider: Provider::new(keypair, provider_store),
store,
server_address,
domain,
addresses,
}
}
pub fn command_sender(&self) -> Sender<ProviderCommand> {
Expand Down Expand Up @@ -166,7 +163,7 @@ where
} else {
match self
.provider
.create_announce_message(peer_id, self.domain.clone())
.create_announce_message(peer_id, &mut self.addresses)
{
Ok(announce_message) => {
if let Err(e) = self
Expand All @@ -192,40 +189,19 @@ where
}

pub async fn publish_local(&mut self, root_cid: Cid, file_size: u64) -> Result<()> {
let (listener_addresses_sender, listener_addresses_receiver) = oneshot::channel();
self.network_command_sender
.send(NetworkCommand::GetListenerAddresses {
sender: listener_addresses_sender,
})?;

let context_id = root_cid.to_bytes();
info!(
"Creating advertisement for cids under root cid: {:?}.",
root_cid
);
let peer_id = PeerId::from(self.provider.keypair().public());
let addresses = self
.addresses
.iter()
.map(|address| address.to_string())
.collect();

let listener_addresses = listener_addresses_receiver.await?;
let mut addresses = vec![self.server_address.to_string()];
for la in listener_addresses {
let mut address = Multiaddr::empty();
for protocol in la.into_iter() {
match protocol {
Protocol::Ip6(ip) => address.push(Protocol::Ip6(ip)),
Protocol::Ip4(ip) => address.push(Protocol::Ip4(ip)),
Protocol::Tcp(port) => address.push(Protocol::Tcp(port)),
_ => {}
}
}
addresses.push(address.to_string())
}
let advertisement = Advertisement::new(
context_id.clone(),
peer_id,
addresses.clone(),
false,
file_size,
);
let advertisement = Advertisement::new(context_id, peer_id, addresses, false, file_size);
let provider_id = self.provider.create(advertisement)?;

let dag = self.store.dag_traversal(&(root_cid))?;
Expand Down
26 changes: 20 additions & 6 deletions crates/ursa-index-provider/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ pub trait ProviderInterface: Sync + Send + 'static {
fn create(&mut self, ad: Advertisement) -> Result<usize>;
fn add_chunk(&mut self, bytes: Vec<u8>, id: usize) -> Result<()>;
fn publish(&mut self, id: usize) -> Result<Advertisement>;
fn create_announce_message(&mut self, peer_id: PeerId, domain: Multiaddr) -> Result<Vec<u8>>;
fn create_announce_message(
&mut self,
peer_id: PeerId,
addresses: &mut Vec<Multiaddr>,
) -> Result<Vec<u8>>;
}

impl<S> ProviderInterface for Provider<S>
Expand Down Expand Up @@ -157,15 +161,24 @@ where
Err(anyhow!("ad not found"))
}

fn create_announce_message(&mut self, peer_id: PeerId, domain: Multiaddr) -> Result<Vec<u8>> {
let mut multiaddr = domain;
multiaddr.push(Protocol::Http);
multiaddr.push(Protocol::P2p(peer_id.into()));
fn create_announce_message(
&mut self,
peer_id: PeerId,
addresses: &mut Vec<Multiaddr>,
) -> Result<Vec<u8>> {
let multiaddrs = addresses
.iter_mut()
.map(|address| {
address.push(Protocol::Http);
address.push(Protocol::P2p(peer_id.into()));
address.clone()
})
.collect::<Vec<Multiaddr>>();

if let Some(head_cid) = *self.head.read().unwrap() {
let message = Message {
Cid: head_cid,
Addrs: vec![multiaddr],
Addrs: multiaddrs,
ExtraData: *b"",
};

Expand All @@ -184,6 +197,7 @@ pub struct Message {
pub Addrs: Vec<Multiaddr>,
pub ExtraData: [u8; 0],
}

impl Cbor for Message {
fn marshal_cbor(&self) -> Result<Vec<u8>, fvm_ipld_encoding::Error> {
const MESSAGE_BUFFER_LENGTH: [u8; 1] = [131];
Expand Down
8 changes: 2 additions & 6 deletions crates/ursa-index-provider/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;

use crate::{config::ProviderConfig, engine::ProviderEngine};
use db::MemoryDB;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use libp2p::{identity::Keypair, PeerId};
use simple_logger::SimpleLogger;
use tokio::task;
use tracing::{info, log::LevelFilter};
Expand Down Expand Up @@ -44,17 +44,13 @@ pub fn provider_engine_init(
let index_store = get_store();

let service = UrsaService::new(keypair.clone(), &network_config, Arc::clone(&store))?;

let server_address = Multiaddr::try_from("/ip4/0.0.0.0/tcp/0").unwrap();

let provider_engine = ProviderEngine::new(
keypair,
store,
index_store,
ProviderConfig::default(),
service.command_sender(),
server_address,
"/ip4/127.0.0.1/tcp/4069".parse().unwrap(),
vec!["/ip4/127.0.0.1/tcp/4069".parse().unwrap()],
);

let router = provider_engine.router();
Expand Down
12 changes: 6 additions & 6 deletions crates/ursa-rpc-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug)]
pub struct ServerConfig {
/// Domain Multiaddress of the node, eg. `/dns/test-node.ursa.earth`
#[serde(default = "ServerConfig::default_domain")]
pub domain: Multiaddr,
/// Public IP address of the node, eg. `/ip4/127.0.0.1`
#[serde(default = "ServerConfig::default_addresses")]
pub addresses: Vec<Multiaddr>,
/// Port to listen on
#[serde(default = "ServerConfig::default_port")]
pub port: u16,
Expand All @@ -17,8 +17,8 @@ pub struct ServerConfig {
}

impl ServerConfig {
fn default_domain() -> Multiaddr {
"/ip4/127.0.0.1/tcp/4069".parse().unwrap()
fn default_addresses() -> Vec<Multiaddr> {
vec!["/ip4/127.0.0.1/tcp/4069".parse().unwrap()]
}
fn default_port() -> u16 {
4069
Expand All @@ -31,7 +31,7 @@ impl ServerConfig {
impl Default for ServerConfig {
fn default() -> Self {
Self {
domain: Self::default_domain(),
addresses: Self::default_addresses(),
port: Self::default_port(),
addr: Self::default_addr(),
origin: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions crates/ursa-rpc-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
api::NodeNetworkInterface,
config::ServerConfig,
http,
rpc::{routes, RpcServer},
rpc::{self, RpcServer},
service::MultiplexService,
};
use tracing::info;
Expand Down Expand Up @@ -56,7 +56,7 @@ where

pub fn rpc_app(&self) -> Router {
Router::new()
.merge(routes::network::init())
.merge(rpc::routes::network::init())
.layer(Extension(self.rpc_server.clone()))
}

Expand Down
5 changes: 1 addition & 4 deletions crates/ursa-rpc-service/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::Result;
use axum::{headers::HeaderMap, routing::get, Router};
use db::MemoryDB;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use simple_logger::SimpleLogger;
use std::sync::Arc;
use tracing::{log::LevelFilter, warn};
Expand Down Expand Up @@ -44,16 +43,14 @@ pub fn init() -> InitResult {
};
let keypair = Keypair::generate_ed25519();
let service = UrsaService::new(keypair.clone(), &network_config, Arc::clone(&store))?;
let server_address = Multiaddr::try_from("/ip4/0.0.0.0/tcp/0").unwrap();

let provider_engine = ProviderEngine::new(
keypair,
Arc::clone(&store),
get_store(),
ProviderConfig::default(),
service.command_sender(),
server_address,
"/ip4/127.0.0.1/tcp/4069".parse().unwrap(),
vec!["/ip4/127.0.0.1/tcp/4069".parse().unwrap()],
b0xtch marked this conversation as resolved.
Show resolved Hide resolved
);

Ok((service, provider_engine, store))
Expand Down
10 changes: 1 addition & 9 deletions crates/ursa/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,7 @@ impl UrsaConfig {
let mut raw = String::new();
file.read_to_string(&mut raw)?;

let mut config: UrsaConfig =
toml::from_str(&raw).context("Failed to parse config file")?;

// TEMP: remove this after some time
if let Some(domain) = config.provider_config.domain.clone() {
warn!("Config `provider_config.domain` depreciated, moving value to `server_config.domain`");
config.server_config.domain = domain.parse().unwrap();
config.provider_config.domain = None;
}
b0xtch marked this conversation as resolved.
Show resolved Hide resolved
let config: UrsaConfig = toml::from_str(&raw).context("Failed to parse config file")?;

// check if we modified the config at all
let config_str = toml::to_string(&config)?;
Expand Down
10 changes: 1 addition & 9 deletions crates/ursa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use anyhow::Result;
use db::{rocks::RocksDb, rocks_config::RocksDbConfig};
use dotenv::dotenv;
use libp2p::multiaddr::Protocol;
use libp2p::Multiaddr;
use resolve_path::PathResolveExt;
use std::env;
use std::sync::Arc;
Expand Down Expand Up @@ -94,21 +93,14 @@ async fn main() -> Result<()> {
)
.expect("Opening provider RocksDB must succeed");

let server_address = Multiaddr::try_from(format!(
"/ip4/{}/tcp/{}",
server_config.addr, server_config.port
))
.expect("Server to have a valid address");

let index_store = Arc::new(UrsaStore::new(Arc::clone(&Arc::new(provider_db))));
let index_provider_engine = ProviderEngine::new(
keypair,
Arc::clone(&store),
index_store,
provider_config,
service.command_sender(),
server_address,
server_config.domain.clone(),
server_config.addresses.clone(),
);
let index_provider_router = index_provider_engine.router();

Expand Down