diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f919eb..62f93e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,21 @@ # Changelog +## 1.2.0 - 2026-02-02 + +### Added +- Added a `restart` flag to the `FixConnector` connector to restart when receiving a `NEWS` message. + +### Updated +- Updated `retrieve_messages_until` method to accept a list of message types. +- Fixed messages parsing issue that caused the error: `Field missing '=' separator`. + ## 1.1.0 - 2025-10-27 + ### Updated - Added parameters `min_price`, `max_price` and `min_price_increment` to `InstrumentList` response. ## 1.0.1 - 2025-05-08 + ### Removed - Removed the references for `auto-reconnect` in the dropcopy session to fix the following [issue](https://github.com/binance/binance-fix-connector-python/issues/2). diff --git a/README.md b/README.md index 6499851..aeb337c 100644 --- a/README.md +++ b/README.md @@ -19,50 +19,29 @@ pip install binance-fix-connector All the FIX messages can be created with the `BinanceFixConnector` class. The following example demonstrates how to create a simple order using the FIX API: ```python import time -import os -from pathlib import Path from binance_fix_connector.fix_connector import create_order_entry_session from binance_fix_connector.utils import get_api_key, get_private_key +from constants import ( + path, + FIX_OE_URL, + INSTRUMENT, + ORD_REJECT_REASON, + ORD_STATUS, + ORD_TYPES, + SIDES, + TIME_IN_FORCE, +) # Credentials -path = config_path = os.path.join( - Path(__file__).parent.resolve(), "..", "config.ini" -) API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_OE_URL = "tcp+tls://fix-oe.testnet.binance.vision:9000" - -# Response types -ORD_STATUS = { - "0": "NEW", - "1": "PARTIALLY_FILLED", - "2": "FILLED", - "4": "CANCELED", - "6": "PENDING_CANCEL", - "8": "REJECTED", - "A": "PENDING_NEW", - "C": "EXPIRED", -} -ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT"} -SIDES = {"1": "BUY", "2": "SELL"} -TIME_IN_FORCE = { - "1": "GOOD_TILL_CANCEL", - "3": "IMMEDIATE_OR_CANCEL", - "4": "FILL_OR_KILL", -} -ORD_REJECT_REASON = {"99": "OTHER"} - -# Parameter -INSTRUMENT = "BNBUSDT" - client_oe = create_order_entry_session( api_key=API_KEY, private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_OE_URL, ) -client_oe.retrieve_messages_until(message_type="A") +client_oe.retrieve_messages_until(message_type=["A"]) example = "This example shows how to place a single order. Order type LIMIT.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#newordersingled for additional types." client_oe.logger.info(example) @@ -79,7 +58,7 @@ msg.append_pair(59, 1) # TIME IN FORCE client_oe.send_message(msg) -responses = client_oe.retrieve_messages_until(message_type="8") +responses = client_oe.retrieve_messages_until(message_type=["8"]) resp = next( (x for x in responses if x.message_type.decode("utf-8") == "8"), None, @@ -118,7 +97,7 @@ client_oe.logger.info(f"Error code: {error_code} | Reason: {text}") # LOGOUT client_oe.logger.info("LOGOUT (5)") client_oe.logout() -client_oe.retrieve_messages_until(message_type="5") +client_oe.retrieve_messages_until(message_type=["5"]) client_oe.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/config.ini.example b/examples/config.ini.example index 3109643..1181dd9 100644 --- a/examples/config.ini.example +++ b/examples/config.ini.example @@ -6,4 +6,4 @@ [keys] API_KEY = -PATH_TO_PRIVATE_KEY_PEM_FILE = \ No newline at end of file +PATH_TO_PRIVATE_KEY_PEM_FILE = diff --git a/examples/constants.py b/examples/constants.py new file mode 100644 index 0000000..83892d3 --- /dev/null +++ b/examples/constants.py @@ -0,0 +1,52 @@ +import os +from pathlib import Path + +# FIX URLs +FIX_OE_URL = "tcp+tls://fix-oe.testnet.binance.vision:9000" +FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" + +# Credentials +path = os.path.join(Path(__file__).parent.resolve(), "config.ini") + +# Response types +ACTION = {"0": "NEW", "1": "CHANGE", "2": "DELETE"} +AGGRESSOR_SIDE = {"1": "BUY", "2": "SELL"} +CONTINGENCY_TYPE = { + "1": "ONE_CANCELS_THE_OTHER ", + "2": "ONE_TRIGGERS_THE_OTHER", +} +LIMIT_TYPES = {"1": "ORDER_LIMIT", "2": "MESSAGE_LIMIT", "3": "SUBSCRIPTION_LIMIT"} +LIST_ORD_STATUS = {"3": "EXECUTING", "6": "ALL_DONE", "7": "REJECT"} +LIST_ORD_TYPE = {"1": "ONE_CANCELS_THE_OTHER", "2": "ONE_TRIGGERS_THE_OTHER"} +LIST_STATUS_TYPE = { + "2": "RESPONSE", + "4": "EXEC_STARTED", + "5": "ALL_DONE", + "100": "UPDATED", +} +LIST_TRIG_TYPE = {"ACTIVATED": "1", "PARTIALLY_FILLED": "2", "FILLED": "3"} +LIST_TRIG_ACTION = {"RELEASE": "1", "CANCEL": "2"} +ORD_STATUS = { + "0": "NEW", + "1": "PARTIALLY_FILLED", + "2": "FILLED", + "4": "CANCELED", + "6": "PENDING_CANCEL", + "8": "REJECTED", + "A": "PENDING_NEW", + "C": "EXPIRED", +} +ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT", "P": "PEGGED"} +ORD_REJECT_REASON = {"99": "OTHER"} +RESOLUTIONS = {"s": "SECOND", "m": "MINUTE", "h": "HOUR", "d": "DAY"} +SIDES = {"1": "BUY", "2": "SELL"} +TIME_IN_FORCE = { + "1": "GOOD_TILL_CANCEL", + "3": "IMMEDIATE_OR_CANCEL", + "4": "FILL_OR_KILL", +} +UPDATE = {"0": "BID", "1": "OFFER", "2": "TRADE"} + +# Parameters +INSTRUMENT = "BNBUSDT" +TIMEOUT_SECONDS = 20 diff --git a/examples/general/__init__.py b/examples/general/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/general/current_messages_limit_rate.py b/examples/general/current_messages_limit_rate.py index 50ebec3..a78622d 100644 --- a/examples/general/current_messages_limit_rate.py +++ b/examples/general/current_messages_limit_rate.py @@ -1,31 +1,20 @@ #!/usr/bin/env python3 -import os -from pathlib import Path - from binance_fix_connector.fix_connector import ( BinanceFixConnector, create_market_data_session, create_order_entry_session, ) from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, RESOLUTIONS, LIMIT_TYPES, FIX_OE_URL, FIX_MD_URL # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URLs -FIX_OE_URL = "tcp+tls://fix-oe.testnet.binance.vision:9000" -FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" - -# Response types -RESOLUTIONS = {"s": "SECOND", "m": "MINUTE", "h": "HOUR", "d": "DAY"} -LIMIT_TYPES = {"1": "ORDER_LIMIT", "2": "MESSAGE_LIMIT", "3": "SUBSCRIPTION_LIMIT"} - def show_rendered_limit_session(client: BinanceFixConnector) -> None: """Show the current LIMITS the session has.""" - responses = client.retrieve_messages_until(message_type="XLR") + responses = client.retrieve_messages_until(message_type=["XLR"]) for msg in responses: if msg.message_type.decode("utf-8") == "XLR": limits = 0 if not msg.get(25003) else int(msg.get(25003).decode("utf-8")) @@ -73,7 +62,7 @@ def show_rendered_limit_session(client: BinanceFixConnector) -> None: private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_OE_URL, ) -client_oe.retrieve_messages_until(message_type="A") +client_oe.retrieve_messages_until(message_type=["A"]) example = "This example shows how to query for current session limits and how to parse it's data. Check https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#limitqueryxlq for additional information." client_oe.logger.info(example) @@ -87,7 +76,7 @@ def show_rendered_limit_session(client: BinanceFixConnector) -> None: # LOGOUT client_oe.logger.info("LOGOUT (5)") client_oe.logout() -client_oe.retrieve_messages_until(message_type="5") +client_oe.retrieve_messages_until(message_type=["5"]) client_oe.logger.info( "Closing the connection with server as we already sent the logout message" ) @@ -99,7 +88,7 @@ def show_rendered_limit_session(client: BinanceFixConnector) -> None: private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_MD_URL, ) -client_md.retrieve_messages_until(message_type="A") +client_md.retrieve_messages_until(message_type=["A"]) example = "This example shows how to query for current session limits and how to parse it's data. Check https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md for additional information." client_md.logger.info(example) @@ -113,7 +102,7 @@ def show_rendered_limit_session(client: BinanceFixConnector) -> None: # LOGOUT client_md.logger.info("LOGOUT (5)") client_md.logout() -client_md.retrieve_messages_until(message_type="5") +client_md.retrieve_messages_until(message_type=["5"]) client_md.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/general/instrument_list.py b/examples/general/instrument_list.py index 2e67d7b..2e4cf2a 100644 --- a/examples/general/instrument_list.py +++ b/examples/general/instrument_list.py @@ -1,25 +1,17 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from binance_fix_connector.fix_connector import ( BinanceFixConnector, create_market_data_session, ) from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, FIX_MD_URL # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" - -# Parameters -INSTRUMENT = "BNBUSDT" - def show_rendered_instrument_list(client: BinanceFixConnector) -> None: """Show the instrument list messages received.""" @@ -102,7 +94,7 @@ def show_rendered_instrument_list(client: BinanceFixConnector) -> None: private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_MD_URL, ) -client_md.retrieve_messages_until(message_type="A") +client_md.retrieve_messages_until(message_type=["A"]) example = "This example shows how to query information about active instruments.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#instrumentlistrequestx for additional types." client_md.logger.info(example) @@ -125,7 +117,7 @@ def show_rendered_instrument_list(client: BinanceFixConnector) -> None: # LOGOUT client_md.logger.info("LOGOUT (5)") client_md.logout() -client_md.retrieve_messages_until(message_type="5") +client_md.retrieve_messages_until(message_type=["5"]) client_md.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/market_stream/__init__.py b/examples/market_stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/maket_stream/depth_stream.py b/examples/market_stream/depth_stream.py similarity index 91% rename from examples/maket_stream/depth_stream.py rename to examples/market_stream/depth_stream.py index 002b8d0..d0f80ac 100644 --- a/examples/maket_stream/depth_stream.py +++ b/examples/market_stream/depth_stream.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from datetime import datetime, timedelta @@ -11,26 +9,15 @@ create_market_data_session, ) from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, ACTION, FIX_MD_URL, INSTRUMENT, UPDATE, TIMEOUT_SECONDS # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URLs -FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" - -# Response types -UPDATE = {"0": "BID", "1": "OFFER", "2": "TRADE"} -ACTION = {"0": "NEW", "1": "CHANGE", "2": "DELETE"} - -# Parameters -INSTRUMENT = "BNBUSDT" -TIMEOUT_SECONDS = 20 - def show_rendered_snapshot_message(client: BinanceFixConnector) -> None: """Show the snapshot message received.""" - responses = client.retrieve_messages_until(message_type="W") + responses = client.retrieve_messages_until(message_type=["W"]) for msg in responses: if msg.message_type.decode("utf-8") == "W": client.logger.info("Parsing a MarketDataSnapshot (W) ...") @@ -115,7 +102,7 @@ def show_rendered_market_depth_stream(client: BinanceFixConnector) -> None: private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_MD_URL, ) -client_md.retrieve_messages_until(message_type="A") +client_md.retrieve_messages_until(message_type=["A"]) example = "This example shows how to subscribe to a book depth stream.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#diffdepthstream for additional types." client_md.logger.info(example) @@ -166,7 +153,7 @@ def show_rendered_market_depth_stream(client: BinanceFixConnector) -> None: # LOGOUT client_md.logger.info("LOGOUT (5)") client_md.logout() -client_md.retrieve_messages_until(message_type="5") +client_md.retrieve_messages_until(message_type=["5"]) client_md.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/maket_stream/ticker_stream.py b/examples/market_stream/ticker_stream.py similarity index 92% rename from examples/maket_stream/ticker_stream.py rename to examples/market_stream/ticker_stream.py index 5f5525b..82322bd 100644 --- a/examples/maket_stream/ticker_stream.py +++ b/examples/market_stream/ticker_stream.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from datetime import datetime, timedelta from binance_fix_connector.fix_connector import ( @@ -10,25 +8,15 @@ create_market_data_session, ) from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, ACTION, FIX_MD_URL, INSTRUMENT, UPDATE, TIMEOUT_SECONDS # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" - -# Response types -UPDATE = {"0": "BID", "1": "OFFER", "2": "TRADE"} - -# Parameters -INSTRUMENT = "BNBUSDT" -TIMEOUT_SECONDS = 20 - def show_rendered_snapshot_message(client: BinanceFixConnector) -> None: """Show the snapshot message received.""" - responses = client.retrieve_messages_until(message_type="W") + responses = client.retrieve_messages_until(message_type=["W"]) for msg in responses: if msg.message_type.decode("utf-8") == "W": client.logger.info("Parsing a MarketDataSnapshot (W) ...") @@ -106,7 +94,7 @@ def show_rendered_market_book_ticker_stream(client: BinanceFixConnector) -> None endpoint=FIX_MD_URL, recv_window=100, ) -client_md.retrieve_messages_until(message_type="A") +client_md.retrieve_messages_until(message_type=["A"]) example = "This example shows how to subscribe to a book ticker stream.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#symbolbooktickerstream for additional types." client_md.logger.info(example) @@ -157,7 +145,7 @@ def show_rendered_market_book_ticker_stream(client: BinanceFixConnector) -> None # LOGOUT client_md.logger.info("LOGOUT (5)") client_md.logout() -client_md.retrieve_messages_until(message_type="5") +client_md.retrieve_messages_until(message_type=["5"]) client_md.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/maket_stream/trade_stream.py b/examples/market_stream/trade_stream.py similarity index 90% rename from examples/maket_stream/trade_stream.py rename to examples/market_stream/trade_stream.py index 47ecd80..72ec6be 100644 --- a/examples/maket_stream/trade_stream.py +++ b/examples/market_stream/trade_stream.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from datetime import datetime, timedelta from binance_fix_connector.fix_connector import ( @@ -10,22 +8,19 @@ create_market_data_session, ) from binance_fix_connector.utils import get_api_key, get_private_key +from constants import ( + path, + ACTION, + AGGRESSOR_SIDE, + FIX_MD_URL, + INSTRUMENT, + UPDATE, + TIMEOUT_SECONDS, +) # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_MD_URL = "tcp+tls://fix-md.testnet.binance.vision:9000" - -# Response types -UPDATE = {"0": "BID", "1": "OFFER", "2": "TRADE"} -AGGRESSOR_SIDE = {"1": "BUY", "2": "SELL"} - -# Parameters -INSTRUMENT = "BNBUSDT" -TIMEOUT_SECONDS = 20 - def show_rendered_market_trade_stream(client: BinanceFixConnector) -> None: """Show the current TRADE stream messages received.""" @@ -79,7 +74,7 @@ def show_rendered_market_trade_stream(client: BinanceFixConnector) -> None: private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_MD_URL, ) -client_md.retrieve_messages_until(message_type="A") +client_md.retrieve_messages_until(message_type=["A"]) example = "This example shows how to subscribe to a trade stream.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#tradestream for additional types." client_md.logger.info(example) @@ -133,7 +128,7 @@ def show_rendered_market_trade_stream(client: BinanceFixConnector) -> None: # LOGOUT client_md.logger.info("LOGOUT (5)") client_md.logout() -client_md.retrieve_messages_until(message_type="5") +client_md.retrieve_messages_until(message_type=["5"]) client_md.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/trade/__init__.py b/examples/trade/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/trade/new_list_OTO_order.py b/examples/trade/new_list_OTO_order.py index 7db6f7c..c52e83f 100644 --- a/examples/trade/new_list_OTO_order.py +++ b/examples/trade/new_list_OTO_order.py @@ -1,47 +1,26 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from binance_fix_connector.fix_connector import create_order_entry_session from binance_fix_connector.utils import get_api_key, get_private_key +from constants import ( + path, + FIX_OE_URL, + INSTRUMENT, + LIST_STATUS_TYPE, + LIST_ORD_STATUS, + LIST_ORD_TYPE, + ORD_REJECT_REASON, + ORD_TYPES, + ORD_STATUS, + SIDES, + TIME_IN_FORCE, +) # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_OE_URL = "tcp+tls://fix-oe.testnet.binance.vision:9000" - -# Response types -ORD_STATUS = { - "0": "NEW", - "1": "PARTIALLY_FILLED", - "2": "FILLED", - "4": "CANCELED", - "6": "PENDING_CANCEL", - "8": "REJECTED", - "A": "PENDING_NEW", - "C": "EXPIRED", -} -ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT"} -SIDES = {"1": "BUY", "2": "SELL"} -TIME_IN_FORCE = { - "1": "GOOD_TILL_CANCEL", - "3": "IMMEDIATE_OR_CANCEL", - "4": "FILL_OR_KILL", -} -ORD_REJECT_REASON = {"99": "OTHER"} -LIST_STATUS = {"2": "RESPONSE", "4": "EXEC_STARTED", "5": "ALL_DONE"} -LIST_ORD_STATUS = {"3": "EXECUTING", "6": "ALL_DONE", "7": "REJECT"} -LIST_ORD_TYPE = {"1": "ONE_CANCELS_THE_OTHER", "2": "ONE_TRIGGERS_THE_OTHER"} -LIST_TRIG_TYPE = {"ACTIVATED": "1", "PARTIALLY_FILLED": "2", "FILLED": "3"} -LIST_TRIG_ACTION = {"RELEASE": "1", "CANCEL": "2"} - -# Parameters -INSTRUMENT = "BNBUSDT" - client_oe = create_order_entry_session( api_key=API_KEY, private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), @@ -49,7 +28,7 @@ ) -client_oe.retrieve_messages_until(message_type="A") +client_oe.retrieve_messages_until(message_type=["A"]) client_oe.get_all_new_messages_received() example = "This example shows how to place a list order type OTO, where both legs are Order type LIMIT.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#neworderliste for additional types." @@ -84,7 +63,7 @@ msg.append_pair(25014, f"{identifier}") client_oe.send_message(msg) -responses = client_oe.retrieve_messages_until(message_type="N") +responses = client_oe.retrieve_messages_until(message_type=["N"]) resp = next( (x for x in responses if x.message_type.decode("utf-8") == "N"), None, @@ -99,7 +78,7 @@ list_ord_status = None if not resp.get(431) else resp.get(431).decode("utf-8") cl_list_id = None if not resp.get(25014) else resp.get(25014).decode("utf-8") contingency = None if not resp.get(1385) else resp.get(1385).decode("utf-8") -header = f"Symbol: {symbol} | List status: {LIST_STATUS.get(list_status_type,list_status_type)} | List order status: {LIST_ORD_STATUS.get(list_ord_status,list_ord_status)}" +header = f"Symbol: {symbol} | List status: {LIST_STATUS_TYPE.get(list_status_type,list_status_type)} | List order status: {LIST_ORD_STATUS.get(list_ord_status,list_ord_status)}" header_2 = f"Client list id: {cl_list_id} | List type: {LIST_ORD_TYPE.get(contingency,contingency)} | " client_oe.logger.info(header) client_oe.logger.info(header_2) @@ -123,8 +102,8 @@ # +++++++++++++++++++++++++++ -responses = client_oe.retrieve_messages_until(message_type="8") -responses.extend(client_oe.retrieve_messages_until(message_type="8")) +responses = client_oe.retrieve_messages_until(message_type=["8"]) +responses.extend(client_oe.retrieve_messages_until(message_type=["8"])) resp = next( ( x @@ -216,7 +195,7 @@ # LOGOUT client_oe.logger.info("LOGOUT (5)") client_oe.logout() -client_oe.retrieve_messages_until(message_type="5") +client_oe.retrieve_messages_until(message_type=["5"]) client_oe.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/trade/new_order.py b/examples/trade/new_order.py index 7b14ebd..48bf5f7 100644 --- a/examples/trade/new_order.py +++ b/examples/trade/new_order.py @@ -1,48 +1,29 @@ #!/usr/bin/env python3 import time -import os -from pathlib import Path from binance_fix_connector.fix_connector import create_order_entry_session from binance_fix_connector.utils import get_api_key, get_private_key +from constants import ( + path, + FIX_OE_URL, + INSTRUMENT, + ORD_REJECT_REASON, + ORD_STATUS, + ORD_TYPES, + SIDES, + TIME_IN_FORCE, +) # Credentials -path = config_path = os.path.join(Path(__file__).parent.resolve(), "..", "config.ini") API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) -# FIX URL -FIX_OE_URL = "tcp+tls://fix-oe.testnet.binance.vision:9000" - -# Response types -ORD_STATUS = { - "0": "NEW", - "1": "PARTIALLY_FILLED", - "2": "FILLED", - "4": "CANCELED", - "6": "PENDING_CANCEL", - "8": "REJECTED", - "A": "PENDING_NEW", - "C": "EXPIRED", -} -ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT"} -SIDES = {"1": "BUY", "2": "SELL"} -TIME_IN_FORCE = { - "1": "GOOD_TILL_CANCEL", - "3": "IMMEDIATE_OR_CANCEL", - "4": "FILL_OR_KILL", -} -ORD_REJECT_REASON = {"99": "OTHER"} - -# Parameter -INSTRUMENT = "BNBUSDT" - client_oe = create_order_entry_session( api_key=API_KEY, private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), endpoint=FIX_OE_URL, ) -client_oe.retrieve_messages_until(message_type="A") +client_oe.retrieve_messages_until(message_type=["A"]) example = "This example shows how to place a single order. Order type LIMIT.\nCheck https://github.com/binance/binance-spot-api-docs/blob/master/fix-api.md#newordersingled for additional types." client_oe.logger.info(example) @@ -59,7 +40,7 @@ client_oe.send_message(msg) -responses = client_oe.retrieve_messages_until(message_type="8") +responses = client_oe.retrieve_messages_until(message_type=["8"]) resp = next( (x for x in responses if x.message_type.decode("utf-8") == "8"), None, @@ -98,7 +79,7 @@ # LOGOUT client_oe.logger.info("LOGOUT (5)") client_oe.logout() -client_oe.retrieve_messages_until(message_type="5") +client_oe.retrieve_messages_until(message_type=["5"]) client_oe.logger.info( "Closing the connection with server as we already sent the logout message" ) diff --git a/examples/trade/new_order_amend_keep_priority.py b/examples/trade/new_order_amend_keep_priority.py new file mode 100644 index 0000000..6971b32 --- /dev/null +++ b/examples/trade/new_order_amend_keep_priority.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import time + +from binance_fix_connector.fix_connector import create_order_entry_session +from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, FIX_OE_URL, INSTRUMENT + +# Credentials +API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) + +client_oe = create_order_entry_session( + api_key=API_KEY, + private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), + endpoint=FIX_OE_URL, +) + +client_oe.retrieve_messages_until(message_type=["A"]) + +# PLACING SIMPLE ORDER +msg = client_oe.create_fix_message_with_basic_header("D") +msg.append_pair(38, 0.01) # ORD QTY +msg.append_pair(40, 1) # ORD TYPE +msg.append_pair(11, str(time.time_ns())) # CL ORD ID +msg.append_pair(54, 1) # SIDE +msg.append_pair(55, INSTRUMENT) # SYMBOL +client_oe.send_message(msg) + + +responses = client_oe.retrieve_messages_until(message_type=["8"]) +resp = next( + (x for x in responses if x.message_type.decode("utf-8") == "8"), + None, +) + +ord_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + +# PLACING AMEND KEEP PRIORITY ORDER +cl_ord_id = str(time.time_ns()) +msg = client_oe.create_fix_message_with_basic_header("XAK") +msg.append_pair(11, cl_ord_id) # ClOrdID +msg.append_pair(37, ord_id) # ord_id +msg.append_pair(55, INSTRUMENT) # SYMBOL +msg.append_pair(38, 0.02) # QTY +client_oe.send_message(msg) + + +amend_responses = client_oe.retrieve_messages_until( + message_type=["8", "XAR"], message_cl_ord_id=cl_ord_id +) +resp = next( + ( + x + for x in amend_responses + if x.get(11) and x.get(11).decode("utf-8") == cl_ord_id + ), + None, +) + +msg_type = None if not resp.get(35) else resp.get(35).decode("utf-8") +if msg_type == "XAR": + order_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + order_qty = None if not resp.get(38) else resp.get(38).decode("utf-8") + error_text = None if not resp.get(25016) else resp.get(25016).decode("utf-8") + text = None if not resp.get(58) else resp.get(58).decode("utf-8") + + client_oe.logger.info("Parsing response Order Amend Reject (XAR) for an order.") + client_oe.logger.info( + f"Order -> Order ID: {order_id} / Symbol: {symbol} / Order Qty: {order_qty}" + ) + client_oe.logger.info(f"Error code: {error_text} | Reason: {text}") +elif msg_type == "8": + order_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + order_qty = None if not resp.get(38) else resp.get(38).decode("utf-8") + ord_status = None if not resp.get(39) else resp.get(39).decode("utf-8") + + client_oe.logger.info("Parsing response Execution Report (8) for an order amend.") + client_oe.logger.info( + f"Order -> Order ID: {order_id} / Symbol: {symbol} / Order Qty: {order_qty} / Status: {ord_status}" + ) + +# LOGOUT +client_oe.logger.info("LOGOUT (5)") +client_oe.logout() +client_oe.retrieve_messages_until(message_type=["5"]) +client_oe.logger.info( + "Closing the connection with server as we already sent the logout message" +) +client_oe.disconnect() diff --git a/examples/trade/new_order_cancel.py b/examples/trade/new_order_cancel.py new file mode 100644 index 0000000..190b6d6 --- /dev/null +++ b/examples/trade/new_order_cancel.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +import time + +from binance_fix_connector.fix_connector import create_order_entry_session +from binance_fix_connector.utils import get_api_key, get_private_key +from constants import ( + path, + FIX_OE_URL, + INSTRUMENT, + CONTINGENCY_TYPE, + LIST_STATUS_TYPE, + LIST_ORD_STATUS, +) + +# Credentials +API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) + +client_oe = create_order_entry_session( + api_key=API_KEY, + private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), + endpoint=FIX_OE_URL, +) + +client_oe.retrieve_messages_until(message_type=["A"]) + +# PLACING CANCEL ORDER +msg = client_oe.create_fix_message_with_basic_header("F") +msg.append_pair(11, "order_id") # ClOrdID +msg.append_pair(55, INSTRUMENT) # SYMBOL +client_oe.send_message(msg) + +responses = client_oe.retrieve_messages_until(message_type=["3", "8", "9", "N"]) + + +def show_execution_response(responses, client_oe): + """Show execution response result.""" + + client_oe.logger.info( + "Parsing response Execution Report (8) for an order LIMIT type." + ) + + cl_ord_id = None if not resp.get(11) else resp.get(11).decode("utf-8") + order_qty = None if not resp.get(38) else resp.get(38).decode("utf-8") + ord_type = None if not resp.get(40) else resp.get(40).decode("utf-8") + side = None if not resp.get(54) else resp.get(54).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + price = None if not resp.get(44) else resp.get(44).decode("utf-8") + time_in_force = None if not resp.get(59) else resp.get(59).decode("utf-8") + cum_qty = None if not resp.get(14) else resp.get(14).decode("utf-8") + last_qty = None if not resp.get(32) else resp.get(32).decode("utf-8") + ord_status = None if not resp.get(39) else resp.get(39).decode("utf-8") + ord_rej_reason = None if not resp.get(103) else resp.get(103).decode("utf-8") + error_code = None if not resp.get(25016) else resp.get(25016).decode("utf-8") + text = None if not resp.get(58) else resp.get(58).decode("utf-8") + + client_oe.logger.info(f"Client order ID: {cl_ord_id}") + client_oe.logger.info(f"Symbol: {symbol}") + client_oe.logger.info( + f"Order -> Type: {ORD_TYPES.get(ord_type, ord_type)} | Side: {SIDES.get(side, side)} | TimeInForce: {TIME_IN_FORCE.get(time_in_force,time_in_force)}", + ) + client_oe.logger.info( + f"Price: {price} | Quantity: {order_qty} | cum qty: {cum_qty} | last qty: {last_qty}" + ) + client_oe.logger.info( + f"Status: {ORD_STATUS.get(ord_status,ord_status)} | Msg: {ORD_REJECT_REASON.get(ord_rej_reason,ord_rej_reason)}", + ) + client_oe.logger.info(f"Error code: {error_code} | Reason: {text}") + + +def show_order_cancel_reject_response(responses, client_oe): + """Show order cancel reject response result.""" + + client_oe.logger.info("Parsing response Order Cancel Reject (9) ...") + + cl_ord_id = None if not resp.get(11) else resp.get(11).decode("utf-8") + orig_cl_ord_id = None if not resp.get(41) else resp.get(41).decode("utf-8") + order_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + cancel_restrictions = ( + None if not resp.get(25002) else resp.get(25002).decode("utf-8") + ) + cxl_rej_response_to = None if not resp.get(434) else resp.get(434).decode("utf-8") + error_code = None if not resp.get(25016) else resp.get(25016).decode("utf-8") + text = None if not resp.get(58) else resp.get(58).decode("utf-8") + + client_oe.logger.info(f"Client order ID: {cl_ord_id}") + client_oe.logger.info(f"Original Client order ID: {orig_cl_ord_id}") + client_oe.logger.info(f"Order ID: {order_id}") + client_oe.logger.info(f"Symbol: {symbol}") + client_oe.logger.info(f"Cancel Restrictions: {cancel_restrictions}") + client_oe.logger.info(f"Cancel Reject Response To: {cxl_rej_response_to}") + client_oe.logger.info(f"Reason: {text}") + + +def show_list_status_response(responses, client_oe): + """Show list status response result.""" + + client_oe.logger.info("Parsing response List Status (N) ...") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + list_id = None if not resp.get(66) else resp.get(66).decode("utf-8") + contingency_type = None if not resp.get(1385) else resp.get(1385).decode("utf-8") + list_status_type = None if not resp.get(429) else resp.get(429).decode("utf-8") + list_order_status = None if not resp.get(431) else resp.get(431).decode("utf-8") + list_reject_reason = None if not resp.get(1386) else resp.get(1386).decode("utf-8") + transact_time = None if not resp.get(60) else resp.get(60).decode("utf-8") + error_code = None if not resp.get(25016) else resp.get(25016).decode("utf-8") + text = None if not resp.get(58) else resp.get(58).decode("utf-8") + no_orders = 0 if not resp.get(73) else int(resp.get(73).decode("utf-8")) + + client_oe.logger.info(f"Symbol: {symbol}") + client_oe.logger.info(f"List ID: {list_id}") + client_oe.logger.info( + f"Contingency Type: {CONTINGENCY_TYPE.get(contingency_type, contingency_type)}" + ) + client_oe.logger.info( + f"List Status Type: {LIST_STATUS_TYPE.get(list_status_type, list_status_type)}" + ) + client_oe.logger.info( + f"List Order Status: {LIST_ORD_STATUS.get(list_order_status, list_order_status)}" + ) + client_oe.logger.info(f"List Reject Reason: {list_reject_reason}") + client_oe.logger.info(f"Transact Time: {transact_time}") + client_oe.logger.info(f"Number of Orders: {no_orders}") + client_oe.logger.info(f"Error code: {error_code} | Reason: {text}") + + +for x in responses: + if x.message_type.decode("utf-8") == "8": + client_oe.logger.info("Parsing a ExecutionReport (8) response...") + show_execution_response(responses, client_oe) + elif x.message_type.decode("utf-8") == "9": + client_oe.logger.info("Parsing a OrderCancelReject (9) response...") + show_order_cancel_reject_response(responses, client_oe) + elif x.message_type.decode("utf-8") == "N": + client_oe.logger.info("Parsing a ListStatus (N) response...") + show_list_status_response(responses, client_oe) + +# LOGOUT +client_oe.logger.info("LOGOUT (5)") +client_oe.logout() +client_oe.retrieve_messages_until(message_type=["5"]) +client_oe.logger.info( + "Closing the connection with server as we already sent the logout message" +) +client_oe.disconnect() diff --git a/examples/trade/new_order_cancel_new_order.py b/examples/trade/new_order_cancel_new_order.py new file mode 100644 index 0000000..570d9e4 --- /dev/null +++ b/examples/trade/new_order_cancel_new_order.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 + +import time + +from binance_fix_connector.fix_connector import create_order_entry_session +from binance_fix_connector.utils import get_api_key, get_private_key +from constants import path, FIX_OE_URL, INSTRUMENT, ORD_STATUS + +# Credentials +API_KEY, PATH_TO_PRIVATE_KEY_PEM_FILE = get_api_key(path) + +client_oe = create_order_entry_session( + api_key=API_KEY, + private_key=get_private_key(PATH_TO_PRIVATE_KEY_PEM_FILE), + endpoint=FIX_OE_URL, +) + +client_oe.retrieve_messages_until(message_type=["A"]) + +# PLACING SIMPLE ORDER +cl_ord_id = str(time.time_ns()) +msg = client_oe.create_fix_message_with_basic_header("D") +msg.append_pair(38, 0.02) # ORD QTY +msg.append_pair(40, 1) # ORD TYPE +msg.append_pair(11, cl_ord_id) # CL ORD ID +msg.append_pair(54, 1) # SIDE +msg.append_pair(55, INSTRUMENT) # SYMBOL +client_oe.send_message(msg) + +responses = client_oe.retrieve_messages_until(message_type=["8"]) +resp = next( + (x for x in responses if x.message_type.decode("utf-8") == "8"), + None, +) + +ord_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + +# PLACING CANCEL REQUEST AND NEW ORDER SINGLE MESSAGE +cl_ord_id_2 = str(time.time_ns()) +msg = client_oe.create_fix_message_with_basic_header("XCN") +msg.append_pair(11, cl_ord_id_2) # ClOrdID +msg.append_pair(25033, 2) # OrderCancelRequestAndNewOrderSingleMode +msg.append_pair(37, ord_id) # ord_id +msg.append_pair(25034, cl_ord_id) # CancelClOrdID +msg.append_pair(40, 1) # ORD TYPE +msg.append_pair(55, INSTRUMENT) # SYMBOL +msg.append_pair(54, 2) # SIDE +msg.append_pair(38, 0.01) # QTY +client_oe.send_message(msg) + +amend_responses = client_oe.retrieve_messages_until( + message_type=["8", "9"], message_cl_ord_id=cl_ord_id_2 +) +resp = next( + ( + x + for x in amend_responses + if x.get(11) and x.get(11).decode("utf-8") == cl_ord_id + ), + None, +) + +msg_type = None if not resp.get(35) else resp.get(35).decode("utf-8") +if msg_type == "8": + order_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + order_qty = None if not resp.get(38) else resp.get(38).decode("utf-8") + self_trade_prevention_type = ( + None if not resp.get(25001) else resp.get(25001).decode("utf-8") + ) + exec_type = None if not resp.get(150) else resp.get(150).decode("utf-8") + cum_qty = None if not resp.get(14) else resp.get(14).decode("utf-8") + leaves_qty = None if not resp.get(151) else resp.get(151).decode("utf-8") + cum_quote_qty = None if not resp.get(25017) else resp.get(25017).decode("utf-8") + aggressor_indicator = None if not resp.get(1057) else resp.get(1057).decode("utf-8") + trade_id = None if not resp.get(1003) else resp.get(1003).decode("utf-8") + last_px = None if not resp.get(31) else resp.get(31).decode("utf-8") + last_qty = None if not resp.get(32) else resp.get(32).decode + order_status = None if not resp.get(39) else resp.get(39).decode("utf-8") + client_oe.logger.info("CANCEL REQUEST AND NEW ORDER SINGLE processed successfully.") + client_oe.logger.info(f"Client order ID: {cl_ord_id}") + client_oe.logger.info(f"Symbol: {symbol}") + client_oe.logger.info( + f"Order -> Self Trade Prevention Type: {self_trade_prevention_type} | Exec Type: {exec_type} | Cum Qty: {cum_qty} | Leaves Qty: {leaves_qty} | Cum Quote Qty: {cum_quote_qty} | Aggressor Indicator: {aggressor_indicator} | Trade ID: {trade_id} | Last Px: {last_px}", + ) + client_oe.logger.info( + f"Status: {ORD_STATUS.get(order_status, order_status)}", + ) +elif msg_type == "9": + order_id = None if not resp.get(37) else resp.get(37).decode("utf-8") + symbol = None if not resp.get(55) else resp.get(55).decode("utf-8") + error_text = None if not resp.get(25016) else resp.get(25016).decode("utf-8") + text = None if not resp.get(58) else resp.get(58).decode("utf-8") + + client_oe.logger.info("*" * 50) + client_oe.logger.info("Parsing response Order Cancel Reject (9) for an order.") + client_oe.logger.info(f"Order -> Order ID: {order_id} / Symbol: {symbol}") + client_oe.logger.info(f"Error code: {error_text} | Reason: {text}") + +# LOGOUT +client_oe.logger.info("LOGOUT (5)") +client_oe.logout() +client_oe.retrieve_messages_until(message_type=["5"]) +client_oe.logger.info( + "Closing the connection with server as we already sent the logout message" +) +client_oe.disconnect() diff --git a/pyproject.toml b/pyproject.toml index e1f31c4..6edc953 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "binance_fix_connector" -version = "1.1.0" +version = "1.2.0" authors = [{name = "Binance"}] description = "This is a simple Python library that provides access to Binance Financial Information eXchange (FIX) SPOT messages using the FIX protocol." readme = "README.md" diff --git a/src/binance_fix_connector/fix_connector.py b/src/binance_fix_connector/fix_connector.py index ef8defb..5f00b1f 100644 --- a/src/binance_fix_connector/fix_connector.py +++ b/src/binance_fix_connector/fix_connector.py @@ -38,6 +38,7 @@ class FixMsgTypes: LOGOUT = "5" LOGON = "A" REJECT = "3" + NEWS = "B" class FixTags: @@ -64,7 +65,7 @@ class FixTags: RESPONSE_MODE = "25036" -def __create_session( +def _create_session( api_key: str, private_key: ed25519.Ed25519PrivateKey, endpoint: str, @@ -118,7 +119,7 @@ def create_market_data_session( Message handling: 1->UNORDERED 2->SEQUENTIAL """ - return __create_session( + return _create_session( endpoint=endpoint, api_key=api_key, private_key=private_key, @@ -154,7 +155,7 @@ def create_order_entry_session( Message handling: 1->UNORDERED 2->SEQUENTIAL """ - return __create_session( + return _create_session( endpoint=endpoint, api_key=api_key, private_key=private_key, @@ -192,7 +193,7 @@ def create_drop_copy_session( Message handling: 1->UNORDERED 2->SEQUENTIAL """ - return __create_session( + return _create_session( endpoint=endpoint, api_key=api_key, private_key=private_key, @@ -227,6 +228,7 @@ def __init__( message_handling: int = 2, response_mode: int = 1, drop_copy_flag: bool = False, + restart: bool = True, ) -> None: """ Create a fix session. @@ -248,6 +250,7 @@ def __init__( message_handling (int, optional): The message handling. Defaults to 2 (SEQUENTIAL). response_mode (int, optional): The response mode. Defaults to 1 (EVERYTHING). drop_copy_flag (bool, optional): The drop copy flag. Defaults to False. + restart (bool, optional): Whether to enable automatic session restart upon server notification. Defaults to True. Raises: @@ -301,6 +304,12 @@ def __init__( self.queue_msg_received: Queue[FixMessage] = Queue() self.messages_sent: list[FixMessage] = [] + self.restart: bool = restart + self.restart_flag: bool = False + self.restart_session = None + self.restart_timer = None + self.restart_time = None + logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", @@ -383,7 +392,11 @@ def parse_server_response(self) -> list[FixMessage]: raw_messages = [f"8={x}" for x in msg.split(f"{_SOH_}8=") if x] messages: list[FixMessage] = [] for i in range(len(raw_messages)): - tag_values = [x for x in raw_messages[i].split(_SOH_) if x != ""] + tag_values = [ + x + for x in raw_messages[i].split(_SOH_) + if "=" in x and not x.startswith("=") + ] if ( len(tag_values) > 1 and tag_values[0] == "8=" @@ -445,7 +458,16 @@ def __receive_messages(self) -> None: if messages: self.__data = b"" for msg in messages: - clean_message = msg.encode().decode("utf-8").replace(_SOH_, "|") + try: + clean_message = ( + msg.encode().decode("utf-8").replace(_SOH_, "|") + ) + except ValueError: + self.logger.warning( + "Message decoded but could not be logged (missing MsgType: 35)" + ) + continue + self.logger.info( "%sServer=>Client: %s%s", GREEN, clean_message, RESET ) @@ -492,6 +514,20 @@ def on_message_received(self, messages: list[FixMessage]) -> None: "Sending a heartbeat message as we received a TestRequest message from server" ) self.heartbeat(test_req_resp_id) + if msg_type == FixMsgTypes.NEWS: + self.logger.info("News message received from server.") + news_text = ( + None if not message.get(148) else message.get(148).decode("utf-8") + ) + self.logger.info("NewsText: %s", news_text) + if self.restart: + self.schedule_restart() + if msg_type == FixMsgTypes.LOGOUT and self.restart is False: + self.logger.info( + "Logout message received from server. Closing connection." + ) + self.logout() + self.disconnect() def get_all_new_messages_received(self) -> list[FixMessage]: """ @@ -511,18 +547,27 @@ def get_all_new_messages_received(self) -> list[FixMessage]: def retrieve_messages_until( self, - message_type: str, + message_type: str | list[str], + message_cl_ord_id: str | None = None, timeout_seconds: int = 3, ) -> list[FixMessage]: """Return all the FIX messages received from the server until message of desired type is received.""" # with self.lock: + if isinstance(message_type, str): + message_type = [message_type] messages: list[FixMessage] = [] timeout = datetime.now() + timedelta(seconds=timeout_seconds) while datetime.now() < timeout: for _ in range(self.queue_msg_received.qsize()): msg = self.queue_msg_received.get() messages.append(msg) - if message_type and msg.get("35").decode("utf-8") == message_type: + if message_cl_ord_id: + cl_ord_id = ( + None if not msg.get("11") else msg.get("11").decode("utf-8") + ) + if cl_ord_id == message_cl_ord_id: + return messages + elif message_type and msg.get("35").decode("utf-8") in message_type: return messages time.sleep(0.001) @@ -600,31 +645,40 @@ def logon( recv_window (str | None, optional): The recv window. Defaults to None. """ - self.msg_seq_num = 0 - msg = self.create_fix_message_with_basic_header(FixMsgTypes.LOGON, recv_window) - signature = self.generate_signature( - self.sender_comp_id, - self.target_comp_id, - self.msg_seq_num, - msg.get(FixTags.SENDING_TIME).decode("utf-8"), - ) + if self.restart_flag: + self.logger.info( + "The Server will soon restart. Can't start any new connections" + ) + else: + self.msg_seq_num = 0 + msg = self.create_fix_message_with_basic_header( + FixMsgTypes.LOGON, recv_window + ) + signature = self.generate_signature( + self.sender_comp_id, + self.target_comp_id, + self.msg_seq_num, + msg.get(FixTags.SENDING_TIME).decode("utf-8"), + ) - msg.append_pair(FixTags.ENCRYPT_METHOD, self.encrypt_method, header=False) - msg.append_pair(FixTags.HEART_BT_INT, self.heart_bt_int, header=False) - msg.append_data( - FixTags.RAW_DATA_LENGTH, FixTags.RAW_DATA, signature, header=False - ) + msg.append_pair(FixTags.ENCRYPT_METHOD, self.encrypt_method, header=False) + msg.append_pair(FixTags.HEART_BT_INT, self.heart_bt_int, header=False) + msg.append_data( + FixTags.RAW_DATA_LENGTH, FixTags.RAW_DATA, signature, header=False + ) - msg.append_pair( - FixTags.RESET_SEQ_NUM_FLAG, self.reset_seq_num_flag, header=False - ) + msg.append_pair( + FixTags.RESET_SEQ_NUM_FLAG, self.reset_seq_num_flag, header=False + ) - msg.append_pair(FixTags.USERNAME, self.api_key, header=False) - msg.append_pair(FixTags.MESSAGE_HANDLING, self.message_handling, header=False) - msg.append_pair(FixTags.RESPONSE_MODE, self.response_mode, header=False) - msg.append_pair(FixTags.DROP_COPY_FLAG, self.drop_copy_flag, header=False) + msg.append_pair(FixTags.USERNAME, self.api_key, header=False) + msg.append_pair( + FixTags.MESSAGE_HANDLING, self.message_handling, header=False + ) + msg.append_pair(FixTags.RESPONSE_MODE, self.response_mode, header=False) + msg.append_pair(FixTags.DROP_COPY_FLAG, self.drop_copy_flag, header=False) - self.send_message(msg) + self.send_message(msg) def logout(self, text: str | None = None, recv_window: str | None = None) -> None: """ @@ -686,3 +740,75 @@ def disconnect(self) -> None: with contextlib.suppress(OSError): self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() + + def schedule_restart(self) -> None: + """Schedule the session restart in 10 minutes.""" + if not self.restart_flag: + self.restart_flag = True + self.restart_time = datetime.now() + timedelta(minutes=10) + self.logger.info(f"Session restart scheduled for {self.restart_time}") + + self.restart_session = _create_session( + api_key=self.api_key, + private_key=self.private_key, + endpoint=self.endpoint, + sender_comp_id=self.sender_comp_id, + target_comp_id=self.target_comp_id, + fix_version=self.fix_version, + socket_buffer_size=self.socket_buffer_size, + heart_bt_int=self.heart_bt_int, + reset_seq_num_flag=self.reset_seq_num_flag, + encrypt_method=self.encrypt_method, + message_handling=self.message_handling, + response_mode=self.response_mode, + drop_copy_flag=self.drop_copy_flag, + ) + + if self.restart_timer is None or not self.restart_timer.is_alive(): + self.restart_timer = threading.Thread( + target=self._restart_timer_thread, daemon=True + ) + self.restart_timer.start() + + def _restart_timer_thread(self) -> None: + """Thread that waits until restart time and then performs the restart.""" + while self.restart_flag and datetime.now() < self.restart_time: + time.sleep(1) + + if self.restart_flag: + self.logger.info("Performing scheduled restart...") + self.reconnect() + + def reconnect(self) -> None: + """Perform the actual reconnection to the new session.""" + if not self.restart_flag or not self.restart_session: + self.logger.warning("No restart scheduled or restart session not created") + return + + try: + self.logger.info("Disconnecting current session...") + self.disconnect() + + time.sleep(1) + + self.logger.info("Connecting to new session...") + self.restart_session.connect() + self.sock = self.restart_session.sock + self.ssl_sock = self.restart_session.ssl_sock + self.receive_thread = self.restart_session.receive_thread + self.is_connected = self.restart_session.is_connected + self.msg_seq_num = self.restart_session.msg_seq_num + self.queue_msg_received = self.restart_session.queue_msg_received + self.messages_sent = self.restart_session.messages_sent + + self.__dict__.update(self.restart_session.__dict__) + + self.logger.info("Restart completed successfully") + self.restart_flag = False + self.restart_time = None + + except Exception as e: + self.logger.exception("Error during restart") + self.restart_flag = False + self.restart_time = None + raise diff --git a/tests/client/test_binance_fix_connector.py b/tests/client/test_binance_fix_connector.py new file mode 100644 index 0000000..072438a --- /dev/null +++ b/tests/client/test_binance_fix_connector.py @@ -0,0 +1,283 @@ +import unittest +import threading +import time + +from datetime import datetime, timedelta +from unittest.mock import patch, MagicMock, PropertyMock +from binance_fix_connector.fix_connector import ( + BinanceFixConnector, + FixMsgTypes, + FixTags, + _create_session, +) + + +class TestFixSessionRestart(unittest.TestCase): + + def setUp(self): + """Set up test fixtures.""" + self.api_key = "test_api_key" + self.private_key = MagicMock() + self.endpoint = "test.endpoint.com" + self.sender_comp_id = "TEST123" + self.target_comp_id = "SPOT" + + self.session = BinanceFixConnector( + api_key=self.api_key, + private_key=self.private_key, + endpoint=self.endpoint, + sender_comp_id=self.sender_comp_id, + target_comp_id=self.target_comp_id, + fix_version="FIX.4.4", + heart_bt_int=30, + message_handling=2, + ) + + self.session.sock = MagicMock() + self.session.is_connected = True + self.session.logger = MagicMock() + + def create_mock_message(self, msg_type, additional_fields=None): + """Helper to create mock FIX messages.""" + mock_msg = MagicMock() + fields = {FixTags.MSG_TYPE: msg_type.encode("utf-8")} + if additional_fields: + fields.update( + { + k: v.encode("utf-8") if isinstance(v, str) else v + for k, v in additional_fields.items() + } + ) + + def get_side_effect(tag): + return fields.get(tag) + + mock_msg.get.side_effect = get_side_effect + return mock_msg + + @patch("binance_fix_connector.fix_connector._create_session") + def test_news_message_triggers_restart_schedule(self, mock_create_session): + """Test that receiving a NEWS message schedules a restart.""" + mock_new_session = MagicMock() + mock_create_session.return_value = mock_new_session + + news_msg = self.create_mock_message( + FixMsgTypes.NEWS, {148: "Server restart scheduled in 10 minutes"} + ) + + with patch("binance_fix_connector.fix_connector.datetime") as mock_datetime: + mock_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.now.return_value = mock_now + mock_datetime.side_effect = lambda *args, **kw: ( + datetime(*args, **kw) if args else mock_datetime + ) + + self.session.on_message_received([news_msg]) + + self.assertTrue(self.session.restart_flag) + self.assertIsNotNone(self.session.restart_time) + self.assertIsNotNone(self.session.restart_timer) + self.assertIsInstance(self.session.restart_timer, threading.Thread) + + @patch("binance_fix_connector.fix_connector._create_session") + @patch("binance_fix_connector.fix_connector.threading.Thread") + def test_multiple_news_messages_dont_create_multiple_timers( + self, mock_thread, mock_create_session + ): + """Test that multiple NEWS messages don't create multiple timers.""" + mock_new_session = MagicMock() + mock_create_session.return_value = mock_new_session + + mock_thread_instance = MagicMock() + mock_thread.return_value = mock_thread_instance + + news_msg = self.create_mock_message( + FixMsgTypes.NEWS, {148: "Server restart scheduled in 10 minutes"} + ) + + with patch("binance_fix_connector.fix_connector.datetime") as mock_datetime: + mock_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.now.return_value = mock_now + + self.session.on_message_received([news_msg]) + first_timer = self.session.restart_timer + + self.session.on_message_received([news_msg]) + second_timer = self.session.restart_timer + + self.assertIs(first_timer, second_timer) + self.assertEqual(mock_thread.call_count, 1) + + @patch("binance_fix_connector.fix_connector.time.sleep") + def test_reconnect_performs_proper_restart(self, mock_sleep): + """Test that reconnect properly restarts the session.""" + mock_new_session = MagicMock() + mock_new_session.connect = MagicMock() + mock_new_session.is_connected = True + mock_new_session.msg_seq_num = 100 + mock_new_session.sock = MagicMock() + mock_new_session.queue_msg_received = MagicMock() + mock_new_session.messages_sent = [] + + self.session.restart_flag = True + self.session.restart_session = mock_new_session + self.session.msg_seq_num = 50 + self.session.disconnect = MagicMock() + self.session.reconnect() + self.session.disconnect.assert_called_once() + + mock_new_session.connect.assert_called_once() + + self.assertEqual(self.session.msg_seq_num, 100) + self.assertFalse(self.session.restart_flag) + self.assertIsNone(self.session.restart_time) + + @patch("binance_fix_connector.fix_connector._create_session") + def test_restart_preserves_session_settings(self, mock_create_session): + """Test that all session settings are preserved during restart.""" + self.session.heart_bt_int = 45 + self.session.message_handling = 1 + self.session.response_mode = 2 + self.session.drop_copy_flag = "Y" + + mock_new_session = MagicMock() + mock_create_session.return_value = mock_new_session + + news_msg = self.create_mock_message( + FixMsgTypes.NEWS, {148: "Server restart scheduled in 10 minutes"} + ) + + with patch("binance_fix_connector.fix_connector.datetime") as mock_datetime: + mock_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.now.return_value = mock_now + + self.session.on_message_received([news_msg]) + + mock_create_session.assert_called_once() + + call_args = mock_create_session.call_args + + self.assertEqual(call_args.kwargs["api_key"], self.api_key) + self.assertEqual(call_args.kwargs["private_key"], self.private_key) + self.assertEqual(call_args.kwargs["endpoint"], self.endpoint) + self.assertEqual(call_args.kwargs["sender_comp_id"], self.sender_comp_id) + self.assertEqual(call_args.kwargs["heart_bt_int"], 45) + self.assertEqual(call_args.kwargs["message_handling"], 1) + self.assertEqual(call_args.kwargs["response_mode"], 2) + self.assertEqual(call_args.kwargs["drop_copy_flag"], "Y") + + @patch("binance_fix_connector.fix_connector._create_session") + @patch("binance_fix_connector.fix_connector.threading.Thread") + def test_timer_thread_is_daemon(self, mock_thread, mock_create_session): + """Test that the timer thread is created as a daemon.""" + mock_new_session = MagicMock() + mock_create_session.return_value = mock_new_session + news_msg = self.create_mock_message( + FixMsgTypes.NEWS, {148: "Server restart scheduled in 10 minutes"} + ) + + with patch("binance_fix_connector.fix_connector.datetime") as mock_datetime: + mock_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.now.return_value = mock_now + + self.session.on_message_received([news_msg]) + + mock_thread.assert_called_once() + args, kwargs = mock_thread.call_args + self.assertTrue(kwargs.get("daemon", False)) + + @patch("binance_fix_connector.fix_connector.time.sleep") + def test_restart_timer_triggers_reconnect(self, mock_sleep): + """Test that the timer thread calls reconnect after 10 minutes.""" + self.session.restart_flag = True + base_time = datetime(2024, 1, 1, 12, 0, 0) + future_time = base_time + timedelta(minutes=10) + self.session.restart_time = future_time + + self.session.reconnect = MagicMock() + + with patch("binance_fix_connector.fix_connector.datetime") as mock_datetime: + mock_datetime.now.side_effect = [ + base_time, + future_time + timedelta(seconds=1), + ] + + self.session._restart_timer_thread() + self.session.reconnect.assert_called_once() + + mock_sleep.assert_called() + + @patch("binance_fix_connector.fix_connector._create_session") + def test_news_message_does_not_trigger_restart_when_disabled( + self, mock_create_session + ): + """Test that receiving a NEWS message does NOT schedule a restart when restart=False.""" + session_no_restart = BinanceFixConnector( + api_key=self.api_key, + private_key=self.private_key, + endpoint=self.endpoint, + sender_comp_id=self.sender_comp_id, + target_comp_id=self.target_comp_id, + fix_version="FIX.4.4", + heart_bt_int=30, + message_handling=2, + restart=False, + ) + + session_no_restart.sock = MagicMock() + session_no_restart.is_connected = True + session_no_restart.logger = MagicMock() + + mock_new_session = MagicMock() + mock_create_session.return_value = mock_new_session + + news_msg = self.create_mock_message( + FixMsgTypes.NEWS, {148: "Server restart scheduled in 10 minutes"} + ) + + session_no_restart.on_message_received([news_msg]) + + self.assertFalse(session_no_restart.restart_flag) + self.assertIsNone(session_no_restart.restart_time) + self.assertIsNone(session_no_restart.restart_timer) + + mock_create_session.assert_not_called() + + session_no_restart.logger.info.assert_any_call( + "News message received from server." + ) + + @patch("binance_fix_connector.fix_connector._create_session") + def test_logout_message_when_restart_disabled(self, mock_create_session): + """Test that receiving a LOGOUT message when restart is disabled properly disconnects.""" + session_no_restart = BinanceFixConnector( + api_key=self.api_key, + private_key=self.private_key, + endpoint=self.endpoint, + sender_comp_id=self.sender_comp_id, + target_comp_id=self.target_comp_id, + fix_version="FIX.4.4", + heart_bt_int=30, + message_handling=2, + restart=False, + ) + + session_no_restart.sock = MagicMock() + session_no_restart.is_connected = True + session_no_restart.logger = MagicMock() + session_no_restart.disconnect = MagicMock() + session_no_restart.logout = MagicMock() + + logout_msg = self.create_mock_message(FixMsgTypes.LOGOUT) + + session_no_restart.on_message_received([logout_msg]) + + session_no_restart.logger.info.assert_called_with( + "Logout message received from server. Closing connection." + ) + session_no_restart.logout.assert_called_once() + session_no_restart.disconnect.assert_called_once() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/general/test_current_messages_limit_rate.py b/tests/general/test_current_messages_limit_rate.py index 8e2bb44..d62d552 100644 --- a/tests/general/test_current_messages_limit_rate.py +++ b/tests/general/test_current_messages_limit_rate.py @@ -154,7 +154,7 @@ def test_current_messages_limit_rate( ) # Consume until logout message - messages = client.retrieve_messages_until(message_type="5", timeout_seconds=1) + messages = client.retrieve_messages_until(message_type=["5"], timeout_seconds=1) # Logout acknowledgment log_out_message = messages[-1] diff --git a/tests/general/test_instrument_list.py b/tests/general/test_instrument_list.py index 8f235cc..8c10868 100644 --- a/tests/general/test_instrument_list.py +++ b/tests/general/test_instrument_list.py @@ -203,7 +203,7 @@ def test_instrument_list( # Consume until logout message messages = client_md.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment diff --git a/tests/market_stream/test_book_depth_stream.py b/tests/market_stream/test_book_depth_stream.py index bd476a9..8728891 100644 --- a/tests/market_stream/test_book_depth_stream.py +++ b/tests/market_stream/test_book_depth_stream.py @@ -170,7 +170,7 @@ def test_depth_stream( # Consume until logout message messages = client_md.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment diff --git a/tests/market_stream/test_book_ticker_stream.py b/tests/market_stream/test_book_ticker_stream.py index dba0161..d571ddb 100644 --- a/tests/market_stream/test_book_ticker_stream.py +++ b/tests/market_stream/test_book_ticker_stream.py @@ -163,7 +163,7 @@ def test_ticker_stream( # Consume until logout message messages = client_md.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment diff --git a/tests/market_stream/test_book_trade_stream.py b/tests/market_stream/test_book_trade_stream.py index fe94911..7f6a332 100644 --- a/tests/market_stream/test_book_trade_stream.py +++ b/tests/market_stream/test_book_trade_stream.py @@ -168,7 +168,7 @@ def test_trade_stream( # Consume until logout message messages = client_md.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment diff --git a/tests/trade/test_list_OTO_order.py b/tests/trade/test_list_OTO_order.py index bbceacc..0038a15 100644 --- a/tests/trade/test_list_OTO_order.py +++ b/tests/trade/test_list_OTO_order.py @@ -27,7 +27,7 @@ "A": "PENDING_NEW", "C": "EXPIRED", } -ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT"} +ORD_TYPES = {"1": "MARKET", "2": "LIMIT", "3": "STOP", "4": "STOP_LIMIT", "P": "PEGGED"} SIDES = {"1": "BUY", "2": "SELL"} TIME_IN_FORCE = { "1": "GOOD_TILL_CANCEL", @@ -394,7 +394,7 @@ def test_list_OTO_order( # Consume until logout message messages = client_oe.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment diff --git a/tests/trade/test_new_order.py b/tests/trade/test_new_order.py index 9b9582f..1191dcd 100644 --- a/tests/trade/test_new_order.py +++ b/tests/trade/test_new_order.py @@ -152,7 +152,7 @@ def test_new_order( # Consume until logout message messages = client_oe.retrieve_messages_until( - message_type="5", timeout_seconds=1 + message_type=["5"], timeout_seconds=1 ) # Logout acknowledgment