Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 260 additions & 17 deletions src/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import { createHmacSignature, createAsymmetricSignature } from 'signature'

const endpoints = {
base: 'wss://stream.binance.com:9443/ws',
futures: 'wss://fstream.binance.com/ws',
futures: 'wss://fstream.binance.com/market/ws',
futuresPublic: 'wss://fstream.binance.com/public/ws',
futuresMarket: 'wss://fstream.binance.com/market/ws',
futuresPrivate: 'wss://fstream.binance.com/private/ws',
delivery: 'wss://dstream.binance.com/ws',
}

Expand Down Expand Up @@ -56,9 +59,13 @@ const depth = (payload, cb, transform = true, variator) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const [symbolName, updateSpeed] = symbol.toLowerCase().split('@')
const w = openWebSocket(
`${variator ? endpoints[variator] : endpoints.base}/${symbolName}@depth${
updateSpeed ? `@${updateSpeed}` : ''
}`,
`${
variator === 'futures'
? endpoints.futuresPublic
: variator
? endpoints[variator]
: endpoints.base
}/${symbolName}@depth${updateSpeed ? `@${updateSpeed}` : ''}`,
)
w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
Expand Down Expand Up @@ -86,7 +93,7 @@ const depth = (payload, cb, transform = true, variator) => {
const futuresRpiDepth = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const symbolName = symbol.toLowerCase()
const w = openWebSocket(`${endpoints.futures}/${symbolName}@rpiDepth@500ms`)
const w = openWebSocket(`${endpoints.futuresPublic}/${symbolName}@rpiDepth@500ms`)
w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(transform ? futuresDepthTransform(obj) : obj)
Expand Down Expand Up @@ -140,9 +147,13 @@ const partialDepth = (payload, cb, transform = true, variator) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(({ symbol, level }) => {
const [symbolName, updateSpeed] = symbol.toLowerCase().split('@')
const w = openWebSocket(
`${variator ? endpoints[variator] : endpoints.base}/${symbolName}@depth${level}${
updateSpeed ? `@${updateSpeed}` : ''
}`,
`${
variator === 'futures'
? endpoints.futuresPublic
: variator
? endpoints[variator]
: endpoints.base
}/${symbolName}@depth${level}${updateSpeed ? `@${updateSpeed}` : ''}`,
)
w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
Expand Down Expand Up @@ -211,7 +222,11 @@ const candles = (payload, interval, cb, transform = true, variator) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const w = openWebSocket(
`${
variator ? endpoints[variator] : endpoints.base
variator === 'futures'
? endpoints.futuresMarket
: variator === 'delivery'
? endpoints.delivery
: endpoints.base
}/${symbol.toLowerCase()}@kline_${interval}`,
)
w.onmessage = msg => {
Expand Down Expand Up @@ -367,7 +382,7 @@ const ticker = (payload, cb, transform = true, variator) => {
const w = openWebSocket(
`${
variator === 'futures'
? endpoints.futures
? endpoints.futuresMarket
: variator === 'delivery'
? endpoints.delivery
: endpoints.base
Expand Down Expand Up @@ -400,7 +415,7 @@ const allTickers = (cb, transform = true, variator) => {
const w = new openWebSocket(
`${
variator === 'futures'
? endpoints.futures
? endpoints.futuresMarket
: variator === 'delivery'
? endpoints.delivery
: endpoints.base
Expand Down Expand Up @@ -550,7 +565,7 @@ const aggTrades = (payload, cb, transform = true, variator) => {
const w = openWebSocket(
`${
variator === 'futures'
? endpoints.futures
? endpoints.futuresMarket
: variator === 'delivery'
? endpoints.delivery
: endpoints.base
Expand Down Expand Up @@ -593,7 +608,7 @@ const futuresLiqsTransform = m => ({

const futuresLiquidations = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const w = openWebSocket(`${endpoints.futures}/${symbol.toLowerCase()}@forceOrder`)
const w = openWebSocket(`${endpoints.futuresMarket}/${symbol.toLowerCase()}@forceOrder`)
w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)

Expand All @@ -610,7 +625,7 @@ const futuresLiquidations = (payload, cb, transform = true) => {
}

const futuresAllLiquidations = (cb, transform = true) => {
const w = new openWebSocket(`${endpoints.futures}/!forceOrder@arr`)
const w = new openWebSocket(`${endpoints.futuresMarket}/!forceOrder@arr`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
Expand All @@ -620,6 +635,218 @@ const futuresAllLiquidations = (cb, transform = true) => {
return options => w.close(1000, 'Close handle was called', { keepClosed: true, ...options })
}

const futuresBookTicker = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const w = openWebSocket(`${endpoints.futuresPublic}/${symbol.toLowerCase()}@bookTicker`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(transform ? bookTickerTransform(obj) : obj)
}

return w
})

return options =>
cache.forEach(w =>
w.close(1000, 'Close handle was called', { keepClosed: true, ...options }),
)
}

const futuresAllBookTickers = (cb, transform = true) => {
const w = openWebSocket(`${endpoints.futuresPublic}/!bookTicker`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(transform ? bookTickerTransform(obj) : obj)
}

return options => w.close(1000, 'Close handle was called', { keepClosed: true, ...options })
}

const futuresMarkPriceTransform = m => ({
eventType: m.e,
eventTime: m.E,
symbol: m.s,
markPrice: m.p,
indexPrice: m.i,
settlePrice: m.P,
fundingRate: m.r,
nextFundingRate: m.T,
})

const futuresMarkPrice = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(input => {
const symbol = typeof input === 'object' ? input.symbol : input
const updateSpeed = typeof input === 'object' ? input.updateSpeed : undefined
const stream =
updateSpeed === '1s'
? `${symbol.toLowerCase()}@markPrice@1s`
: `${symbol.toLowerCase()}@markPrice`

const w = openWebSocket(`${endpoints.futuresMarket}/${stream}`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(transform ? futuresMarkPriceTransform(obj) : obj)
}

return w
})

return options =>
cache.forEach(w =>
w.close(1000, 'Close handle was called', { keepClosed: true, ...options }),
)
}

const futuresContinuousCandles = (payload, interval, cb, transform = true) => {
if (!interval || !cb) {
throw new Error('Please pass a pair, contractType, interval and callback.')
}

const pair = payload.pair.toLowerCase()
const contractType = payload.contractType.toLowerCase()

const w = openWebSocket(
`${endpoints.futuresMarket}/${pair}_${contractType}@continuousKline_${interval}`,
)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
const { e: eventType, E: eventTime, ps: pairSymbol, ct: contType, k: tick } = obj

cb(
transform
? {
eventType,
eventTime,
pair: pairSymbol,
contractType: contType,
...candleTransform(tick),
}
: obj,
)
}

return options => w.close(1000, 'Close handle was called', { keepClosed: true, ...options })
}

const futuresCompositeIndex = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const w = openWebSocket(`${endpoints.futuresMarket}/${symbol.toLowerCase()}@compositeIndex`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(
transform
? {
eventType: obj.e,
eventTime: obj.E,
symbol: obj.s,
price: obj.p,
composition: obj.c
? obj.c.map(c => ({
baseAsset: c.b,
quoteAsset: c.q,
weightInQuantity: c.w,
weightInPercentage: c.W,
indexPrice: c.i,
}))
: [],
}
: obj,
)
}

return w
})

return options =>
cache.forEach(w =>
w.close(1000, 'Close handle was called', { keepClosed: true, ...options }),
)
}

const futuresContractInfo = (cb, transform = true) => {
const w = openWebSocket(`${endpoints.futuresMarket}/!contractInfo`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(
transform
? {
eventType: obj.e,
eventTime: obj.E,
symbol: obj.s,
pair: obj.ps,
contractType: obj.ct,
deliveryDate: obj.dt,
onboardDate: obj.ot,
contractStatus: obj.cs,
brackets: obj.bks
? obj.bks.map(b => ({
notionalBracket: b.bs,
floorNotional: b.bnf,
capNotional: b.bnc,
maintenanceRatio: b.mmr,
auxiliaryNumber: b.cf,
minLeverage: b.mi,
maxLeverage: b.ma,
}))
: [],
}
: obj,
)
}

return options => w.close(1000, 'Close handle was called', { keepClosed: true, ...options })
}

const futuresAssetIndexTransform = m => ({
eventType: m.e,
eventTime: m.E,
symbol: m.s,
index: m.i,
bidBuffer: m.b,
askBuffer: m.a,
bidRate: m.B,
askRate: m.A,
autoExchangeBidBuffer: m.q,
autoExchangeAskBuffer: m.Q,
autoExchangeBidRate: m.g,
autoExchangeAskRate: m.G,
})

const futuresAssetIndex = (payload, cb, transform = true) => {
const cache = (Array.isArray(payload) ? payload : [payload]).map(symbol => {
const w = openWebSocket(`${endpoints.futuresMarket}/${symbol.toLowerCase()}@assetIndex`)

w.onmessage = msg => {
const obj = JSONbig.parse(msg.data)
cb(transform ? futuresAssetIndexTransform(obj) : obj)
}

return w
})

return options =>
cache.forEach(w =>
w.close(1000, 'Close handle was called', { keepClosed: true, ...options }),
)
}

const futuresAllAssetIndex = (cb, transform = true) => {
const w = openWebSocket(`${endpoints.futuresMarket}/!assetIndex@arr`)

w.onmessage = msg => {
const arr = JSONbig.parse(msg.data)
cb(transform ? arr.map(futuresAssetIndexTransform) : arr)
}

return options => w.close(1000, 'Close handle was called', { keepClosed: true, ...options })
}

const tradesTransform = m => ({
eventType: m.e,
eventTime: m.E,
Expand Down Expand Up @@ -1225,7 +1452,7 @@ const user = (opts, variator) => (cb, transform) => {
w = openWebSocket(
`${
variator === 'futures'
? endpoints.futures
? endpoints.futuresPrivate
: variator === 'delivery'
? endpoints.delivery
: endpoints.base
Expand Down Expand Up @@ -1282,7 +1509,7 @@ const futuresAllMarkPricesTransform = m =>
const futuresAllMarkPrices = (payload, cb, transform = true) => {
const variant = payload.updateSpeed === '1s' ? '!markPrice@arr@1s' : '!markPrice@arr'

const w = openWebSocket(`${endpoints.futures}/${variant}`)
const w = openWebSocket(`${endpoints.futuresMarket}/${variant}`)

w.onmessage = msg => {
const arr = JSONbig.parse(msg.data)
Expand All @@ -1294,7 +1521,15 @@ const futuresAllMarkPrices = (payload, cb, transform = true) => {

export default opts => {
if (opts && opts.wsBase) endpoints.base = opts.wsBase
if (opts && opts.wsFutures) endpoints.futures = opts.wsFutures
if (opts && opts.wsFutures) {
endpoints.futures = opts.wsFutures
endpoints.futuresPublic = opts.wsFutures
endpoints.futuresMarket = opts.wsFutures
endpoints.futuresPrivate = opts.wsFutures
}
if (opts && opts.wsFuturesPublic) endpoints.futuresPublic = opts.wsFuturesPublic
if (opts && opts.wsFuturesMarket) endpoints.futuresMarket = opts.wsFuturesMarket
if (opts && opts.wsFuturesPrivate) endpoints.futuresPrivate = opts.wsFuturesPrivate
if (opts && opts.wsDelivery) endpoints.delivery = opts.wsDelivery

if (opts && opts.proxy) {
Expand Down Expand Up @@ -1344,6 +1579,14 @@ export default opts => {
aggTrades(payload, cb, transform, 'delivery'),
futuresLiquidations,
futuresAllLiquidations,
futuresBookTicker,
futuresAllBookTickers,
futuresMarkPrice,
futuresContinuousCandles,
futuresCompositeIndex,
futuresContractInfo,
futuresAssetIndex,
futuresAllAssetIndex,
futuresUser: user(opts, 'futures'),
deliveryUser: user(opts, 'delivery'),
futuresCustomSubStream: (payload, cb) => customSubStream(payload, cb, 'futures'),
Expand Down
2 changes: 1 addition & 1 deletion test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import http from 'http'

export const checkFields = (t, object, fields) => {
fields.forEach(field => {
t.truthy(object[field])
t.truthy(field in object)
})
}

Expand Down
Loading
Loading