Skip to content

Commit

Permalink
fix(@embark/core): Proxy support for raw transactions
Browse files Browse the repository at this point in the history
Contract logs were not occuring in the test_app for reasons unknown, except that it seemed to be pinned to the fact that accounts were being defined using a mnemonic. Turns out this caused transactions to be signed, and therefore sent using `eth_sendRawTransaction`.

Add ability to decode raw transactions in the proxy so that those transactions are also logged.

Refactor Proxy in to a class and add a Proxy unit test test.

Remove RLP package, as there is another way to decode a tx without needed an additional package.

Replace eth transaction methods with constants throughout the codebase.

Add unit tests for Console Listener.
  • Loading branch information
emizzle authored and iurimatias committed Dec 20, 2018
1 parent f1206b4 commit ffcff4a
Show file tree
Hide file tree
Showing 9 changed files with 662 additions and 206 deletions.
7 changes: 6 additions & 1 deletion src/lib/constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
"rinkeby": 4
},
"gasAllowanceError": "Returned error: gas required exceeds allowance or always failing transaction",
"gasAllowanceErrorMessage": "Failing call, this could be because of invalid inputs or function guards that may have been triggered, or an unknown error."
"gasAllowanceErrorMessage": "Failing call, this could be because of invalid inputs or function guards that may have been triggered, or an unknown error.",
"transactionMethods": {
"eth_sendTransaction": "eth_sendTransaction",
"eth_sendRawTransaction": "eth_sendRawTransaction",
"eth_getTransactionReceipt": "eth_getTransactionReceipt"
}
},
"storage": {
"init": "init",
Expand Down
2 changes: 1 addition & 1 deletion src/lib/modules/blockchain_connector/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class BlockchainConnector {
let newParams = Object.assign({}, payload.params[0]);
let newPayload = {
id: payload.id + 1,
method: 'eth_sendTransaction',
method: constants.blockchain.transactionMethods.eth_sendTransaction,
params: [newParams],
jsonrpc: payload.jsonrpc
};
Expand Down
2 changes: 1 addition & 1 deletion src/lib/modules/blockchain_connector/provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class Provider {
}
cb(null, result);
});
} else if (payload.method === 'eth_sendRawTransaction') {
} else if (payload.method === constants.blockchain.transactionMethods.eth_sendRawTransaction) {
return self.runTransaction.push({payload}, cb);
}

Expand Down
6 changes: 3 additions & 3 deletions src/lib/modules/blockchain_process/blockchain.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const utils = require('../../utils/utils.js');
const GethClient = require('./gethClient.js');
const ParityClient = require('./parityClient.js');
const DevFunds = require('./dev_funds.js');
const proxy = require('./proxy');
const Proxy = require('./proxy');
const Ipc = require('../../core/ipc');

const {defaultHost, dockerHostSwap} = require('../../utils/host');
Expand Down Expand Up @@ -154,10 +154,10 @@ Blockchain.prototype.setupProxy = async function () {

let wsProxy;
if (this.config.wsRPC) {
wsProxy = proxy.serve(this.proxyIpc, this.config.wsHost, this.config.wsPort, true, this.config.wsOrigins, addresses, this.certOptions);
wsProxy = new Proxy(this.proxyIpc).serve(this.config.wsHost, this.config.wsPort, true, this.config.wsOrigins, addresses, this.certOptions);
}

[this.rpcProxy, this.wsProxy] = await Promise.all([proxy.serve(this.proxyIpc, this.config.rpcHost, this.config.rpcPort, false, null, addresses, this.certOptions), wsProxy]);
[this.rpcProxy, this.wsProxy] = await Promise.all([new Proxy(this.proxyIpc).serve(this.config.rpcHost, this.config.rpcPort, false, null, addresses, this.certOptions), wsProxy]);
};

Blockchain.prototype.shutdownProxy = function () {
Expand Down
279 changes: 149 additions & 130 deletions src/lib/modules/blockchain_process/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const utils = require('../../utils/utils');
const WsParser = require('simples/lib/parsers/ws');
const WsWrapper = require('simples/lib/ws/wrapper');
const modifyResponse = require('node-http-proxy-json');
const Transaction = require('ethereumjs-tx');
const ethUtil = require('ethereumjs-util');

const METHODS_TO_MODIFY = {accounts: 'eth_accounts'};

Expand Down Expand Up @@ -48,180 +50,197 @@ const parseJsonMaybe = (string) => {
return object;
};

exports.serve = async (ipc, host, port, ws, origin, accounts, certOptions={}) => {
const commList = {};
const receipts = {};
const transactions = {};
const toModifyPayloads = {};
class Proxy {
constructor(ipc) {
this.ipc = ipc;
this.commList = {};
this.receipts = {};
this.transactions = {};
this.toModifyPayloads = {};
}

const trackRequest = (req) => {
trackRequest(req) {
if (!req) return;
try {
if (Object.values(METHODS_TO_MODIFY).includes(req.method)) {
toModifyPayloads[req.id] = req.method;
this.toModifyPayloads[req.id] = req.method;
}
if (req.method === 'eth_sendTransaction') {
commList[req.id] = {
if (req.method === constants.blockchain.transactionMethods.eth_sendTransaction) {
this.commList[req.id] = {
type: 'contract-log',
address: req.params[0].to,
data: req.params[0].data
};
} else if (req.method === 'eth_getTransactionReceipt') {
if (transactions[req.params[0]]) {
transactions[req.params[0]].receiptId = req.id;
receipts[req.id] = transactions[req.params[0]].commListId;
} else if (req.method === constants.blockchain.transactionMethods.eth_sendRawTransaction) {
const rawData = Buffer.from(ethUtil.stripHexPrefix(req.params[0]), 'hex');
const tx = new Transaction(rawData, 'hex');
this.commList[req.id] = {
type: 'contract-log',
address: '0x' + tx.to.toString('hex'),
data: '0x' + tx.data.toString('hex')
};
} else if (req.method === constants.blockchain.transactionMethods.eth_getTransactionReceipt) {
if (this.transactions[req.params[0]]) {
this.transactions[req.params[0]].receiptId = req.id;
this.receipts[req.id] = this.transactions[req.params[0]].commListId;
}
}
} catch (e) {
console.error(
`Proxy: Error tracking request message '${JSON.stringify(req)}'`,
);
}
};
}

const trackResponse = (res) => {
trackResponse(res) {
if (!res) return;
try {
if (commList[res.id]) {
commList[res.id].transactionHash = res.result;
transactions[res.result] = {commListId: res.id};
} else if (receipts[res.id] && res.result && res.result.blockNumber) {
if (this.commList[res.id]) {
this.commList[res.id].transactionHash = res.result;
this.transactions[res.result] = {
commListId: res.id
};
} else if (this.receipts[res.id] && res.result && res.result.blockNumber) {
// TODO find out why commList[receipts[res.id]] is sometimes not defined
if (!commList[receipts[res.id]]) {
commList[receipts[res.id]] = {};
if (!this.commList[this.receipts[res.id]]) {
this.commList[this.receipts[res.id]] = {};
}
commList[receipts[res.id]].blockNumber = res.result.blockNumber;
commList[receipts[res.id]].gasUsed = res.result.gasUsed;
commList[receipts[res.id]].status = res.result.status;
this.commList[this.receipts[res.id]].blockNumber = res.result.blockNumber;
this.commList[this.receipts[res.id]].gasUsed = res.result.gasUsed;
this.commList[this.receipts[res.id]].status = res.result.status;

if (ipc.connected && !ipc.connecting) {
ipc.request('log', commList[receipts[res.id]]);
if (this.ipc.connected && !this.ipc.connecting) {
this.ipc.request('log', this.commList[this.receipts[res.id]]);
} else {
const message = commList[receipts[res.id]];
ipc.connecting = true;
ipc.connect(() => {
ipc.connecting = false;
ipc.request('log', message);
const message = this.commList[this.receipts[res.id]];
this.ipc.connecting = true;
this.ipc.connect(() => {
this.ipc.connecting = false;
this.ipc.request('log', message);
});
}
delete transactions[commList[receipts[res.id]].transactionHash];
delete commList[receipts[res.id]];
delete receipts[res.id];
delete this.transactions[this.commList[this.receipts[res.id]].transactionHash];
delete this.commList[this.receipts[res.id]];
delete this.receipts[res.id];
}
} catch (e) {
console.error(
`Proxy: Error tracking response message '${JSON.stringify(res)}'`
);
}
};
}

const start = Date.now();
await (function waitOnTarget() {
return new Promise(resolve => {
utils.pingEndpoint(
canonicalHost(host),
port,
ws ? 'ws': false,
'http',
origin ? origin.split(',')[0] : undefined,
(err) => {
if (!err || (Date.now() - start > 10000)) {
resolve();
} else {
utils.timer(250).then(waitOnTarget).then(resolve);
async serve(host, port, ws, origin, accounts, certOptions={}) {
const start = Date.now();
await (function waitOnTarget() {
return new Promise(resolve => {
utils.pingEndpoint(
canonicalHost(host),
port,
ws ? 'ws': false,
'http',
origin ? origin.split(',')[0] : undefined,
(err) => {
if (!err || (Date.now() - start > 10000)) {
resolve();
} else {
utils.timer(250).then(waitOnTarget).then(resolve);
}
}
}
);
});
}());

let proxy = httpProxy.createProxyServer({
ssl: certOptions,
target: {
host: canonicalHost(host),
port: port
},
ws: ws,
createWsServerTransformStream: (_req, _proxyReq, _proxyRes) => {
const parser = new WsParser(0, true);
parser.on('frame', ({data: buffer}) => {
let object = parseJsonMaybe(buffer.toString());
if (object) {
object = modifyPayload(this.toModifyPayloads, object, accounts);
// track the modified response
this.trackResponse(object);
// send the modified response
WsWrapper.wrap(
{connection: dupl, masked: 0},
Buffer.from(JSON.stringify(object)),
() => {}
);
}
});
const dupl = new Duplex({
read(_size) {},
write(chunk, encoding, callback) {
parser.write(chunk);
callback();
}
});
return dupl;
}
});

proxy.on('error', (err) => {
console.error(
__('Proxy: Error forwarding requests to blockchain/simulator'),
err.message
);
});
}());

let proxy = httpProxy.createProxyServer({
ssl: certOptions,
target: {
host: canonicalHost(host),
port: port
},
ws: ws,
createWsServerTransformStream: (_req, _proxyReq, _proxyRes) => {
const parser = new WsParser(0, true);
parser.on('frame', ({data: buffer}) => {
let object = parseJsonMaybe(buffer.toString());
if (object) {
object = modifyPayload(toModifyPayloads, object, accounts);
// track the modified response
trackResponse(object);
// send the modified response
WsWrapper.wrap(
{connection: dupl, masked: 0},
Buffer.from(JSON.stringify(object)),
() => {}
);
}
});
const dupl = new Duplex({
read(_size) {},
write(chunk, encoding, callback) {
parser.write(chunk);
callback();

proxy.on('proxyRes', (proxyRes, req, res) => {
modifyResponse(res, proxyRes, (body) => {
if (body) {
body = modifyPayload(this.toModifyPayloads, body, accounts);
this.trackResponse(body);
}
return body;
});
return dupl;
}
});

proxy.on('error', (err) => {
console.error(
__('Proxy: Error forwarding requests to blockchain/simulator'),
err.message
);
});

proxy.on('proxyRes', (proxyRes, req, res) => {
modifyResponse(res, proxyRes, (body) => {
if (body) {
body = modifyPayload(toModifyPayloads, body, accounts);
trackResponse(body);
}
return body;
});
});

const server = http.createServer((req, res) => {
if (req.method === 'POST') {
// messages TO the target
Asm.connectTo(
pump(req, jsonParser())
).on('done', ({current: object}) => {
trackRequest(object);
});
}

if (!ws) {
proxy.web(req, res);
}
});
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
// messages TO the target
Asm.connectTo(
pump(req, jsonParser())
).on('done', ({current: object}) => {
this.trackRequest(object);
});
}

if (ws) {
server.on('upgrade', (msg, socket, head) => {
proxy.ws(msg, socket, head);
if (!ws) {
proxy.web(req, res);
}
});

proxy.on('open', (_proxySocket) => { /* messages FROM the target */ });
if (ws) {
server.on('upgrade', (msg, socket, head) => {
proxy.ws(msg, socket, head);
});

proxy.on('open', (_proxySocket) => { /* messages FROM the target */ });

proxy.on('proxyReqWs', (_proxyReq, _req, socket) => {
// messages TO the target
pump(socket, new WsParser(0, false)).on('frame', ({data: buffer}) => {
const object = parseJsonMaybe(buffer.toString());
trackRequest(object);
proxy.on('proxyReqWs', (_proxyReq, _req, socket) => {
// messages TO the target
pump(socket, new WsParser(0, false)).on('frame', ({data: buffer}) => {
const object = parseJsonMaybe(buffer.toString());
this.trackRequest(object);
});
});
}

return new Promise(resolve => {
server.listen(
port - constants.blockchain.servicePortOnProxy,
defaultHost,
() => { resolve(server); }
);
});
}
}

return new Promise(resolve => {
server.listen(
port - constants.blockchain.servicePortOnProxy,
defaultHost,
() => { resolve(server); }
);
});
};
module.exports = Proxy;
Loading

0 comments on commit ffcff4a

Please sign in to comment.