diff --git a/ipyparallel/engine/app.py b/ipyparallel/engine/app.py index 6626fc61..64d100d3 100755 --- a/ipyparallel/engine/app.py +++ b/ipyparallel/engine/app.py @@ -6,6 +6,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. import asyncio +import inspect import json import os import signal @@ -17,6 +18,7 @@ import ipykernel import zmq +from ipykernel.iostream import IOPubThread from ipykernel.kernelapp import IPKernelApp from ipykernel.zmqshell import ZMQInteractiveShell from IPython.core.profiledir import ProfileDir @@ -667,17 +669,18 @@ def urls(key): # create iopub stream: iopub_addr = url('iopub') - iopub_socket = self.iopub_socket = ctx.socket(zmq.PUB) + thread_params = {} + if 'session' in inspect.signature(IOPubThread).parameters: + iopub_socket_type = zmq.XPUB + thread_params["session"] = self.session + else: + iopub_socket_type = zmq.PUB + iopub_socket = self.iopub_socket = ctx.socket(iopub_socket_type) iopub_socket.SNDHWM = 0 iopub_socket.setsockopt(zmq.IDENTITY, identity) connect(iopub_socket, iopub_addr) - try: - from ipykernel.iostream import IOPubThread - except ImportError: - pass - else: - iopub_socket = IOPubThread(iopub_socket) - iopub_socket.start() + iopub_socket = IOPubThread(iopub_socket, **thread_params) + iopub_socket.start() # disable history: self.config.HistoryManager.hist_file = ':memory:'