Skip to content
Merged
26 changes: 26 additions & 0 deletions examples/z_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Contributors:
# ZettaScale Zenoh Team, <zenoh@zettascale.tech>
#
import time

import zenoh


Expand All @@ -24,6 +26,30 @@ def main(conf: zenoh.Config):
print(f"zid: {info.zid()}")
print(f"routers: {info.routers_zid()}")
print(f"peers: {info.peers_zid()}")
print("transports:")
for t in info.transports():
print(f" - {t}")
print("links:")
for l in info.links():
print(f" - {l}")

# listen for transport and link events using try_recv polling
transport_listener = info.declare_transport_events_listener(history=False)
link_listener = info.declare_link_events_listener(history=False)

print("Listening for transport and link events... (press Ctrl+C to exit)")
try:
while True:
while (event := transport_listener.try_recv()) is not None:
print(f"Transport event: {event}")
while (event := link_listener.try_recv()) is not None:
print(f"Link event: {event}")
time.sleep(0.1)
except KeyboardInterrupt:
pass

transport_listener.undeclare()
link_listener.undeclare()


# --- Command line argument parsing --- --- --- --- --- ---
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ pub(crate) mod zenoh {
},
sample::{Locality, Sample, SampleKind, SourceInfo},
scouting::{scout, Hello, Scout},
session::{open, EntityGlobalId, Session, SessionInfo},
session::{
open, EntityGlobalId, Link, LinkEvent, LinkEventsListener, Session, SessionInfo,
Transport, TransportEvent, TransportEventsListener,
},
time::{Timestamp, TimestampId, NTP64},
ZError,
};
Expand Down
289 changes: 285 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@ use std::time::Duration;

use pyo3::{
prelude::*,
types::{PyDict, PyList, PyTuple},
types::{PyDict, PyIterator, PyList, PyTuple},
IntoPyObjectExt,
};
use zenoh::{session::EntityId, Wait};

use crate::{
bytes::{Encoding, ZBytes},
cancellation::CancellationToken,
config::{Config, ZenohId},
config::{Config, WhatAmI, ZenohId},
handlers::{into_handler, HandlerImpl},
key_expr::KeyExpr,
liveliness::Liveliness,
macros::{build, wrapper},
macros::{build, option_wrapper, wrapper},
pubsub::{Publisher, Subscriber},
qos::{CongestionControl, Priority, Reliability},
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, ReplyKeyExpr, Selector},
sample::{Locality, SourceInfo},
sample::{Locality, SampleKind, SourceInfo},
time::Timestamp,
utils::{duration, wait, IntoPython, MapInto},
};
Expand Down Expand Up @@ -311,6 +312,150 @@ pub(crate) fn open(py: Python, config: Config) -> PyResult<Session> {

wrapper!(zenoh::session::SessionInfo);

wrapper!(zenoh::session::Transport);

#[pymethods]
impl Transport {
#[getter]
fn zid(&self) -> ZenohId {
(*self.0.zid()).into()
}

#[getter]
fn whatami(&self) -> WhatAmI {
self.0.whatami().into()
}

#[getter]
fn is_qos(&self) -> bool {
self.0.is_qos()
}

#[cfg(feature = "shared-memory")]
#[getter]
fn is_shm(&self) -> bool {
self.0.is_shm()
}

#[getter]
fn is_multicast(&self) -> bool {
self.0.is_multicast()
}

fn __eq__(&self, other: &Transport) -> bool {
self.0 == other.0
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::Link);

#[pymethods]
impl Link {
#[getter]
fn zid(&self) -> ZenohId {
(*self.0.zid()).into()
}

#[getter]
fn src(&self) -> String {
self.0.src().to_string()
}

#[getter]
fn dst(&self) -> String {
self.0.dst().to_string()
}

#[getter]
fn group(&self) -> Option<String> {
self.0.group().map(|g| g.to_string())
}

#[getter]
fn mtu(&self) -> u16 {
self.0.mtu()
}

#[getter]
fn is_streamed(&self) -> bool {
self.0.is_streamed()
}

#[getter]
fn interfaces<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return a Vec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach is used for all methods of SessionInfo, better to make it in separate fix

let list = PyList::empty(py);
for interface in self.0.interfaces() {
list.append(interface)?;
}
Ok(list)
}

#[getter]
fn auth_identifier(&self) -> Option<String> {
self.0.auth_identifier().map(|s| s.to_string())
}

#[getter]
fn priorities(&self) -> Option<(u8, u8)> {
self.0.priorities()
}

#[getter]
fn reliability(&self) -> Option<Reliability> {
self.0.reliability().map(Into::into)
}

fn __eq__(&self, other: &Link) -> bool {
self.0 == other.0
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::TransportEvent);

#[pymethods]
impl TransportEvent {
#[getter]
fn kind(&self) -> SampleKind {
self.0.kind().into()
}

#[getter]
fn transport(&self) -> Transport {
self.0.transport().clone().into()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

wrapper!(zenoh::session::LinkEvent);

#[pymethods]
impl LinkEvent {
#[getter]
fn kind(&self) -> SampleKind {
self.0.kind().into()
}

#[getter]
fn link(&self) -> Link {
self.0.link().clone().into()
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
}

#[pymethods]
impl SessionInfo {
fn zid(&self, py: Python) -> ZenohId {
Expand All @@ -333,9 +478,145 @@ impl SessionInfo {
Ok(list)
}

fn transports<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let list = PyList::empty(py);
for transport in py.allow_threads(|| self.0.transports().wait()) {
list.append(transport.into_pyobject(py))?;
}
Ok(list)
}

fn links<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let list = PyList::empty(py);
for link in py.allow_threads(|| self.0.links().wait()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just return a Vec

list.append(link.into_pyobject(py))?;
}
Ok(list)
}

#[pyo3(signature = (handler = None, *, history = None))]
fn declare_transport_events_listener(
&self,
py: Python,
handler: Option<&Bound<PyAny>>,
history: Option<bool>,
) -> PyResult<TransportEventsListener> {
let (handler, background) = into_handler(py, handler, None)?;
let builder = build!(self.0.transport_events_listener(), history);
let mut listener = wait(py, builder.with(handler))?;
if background {
listener.set_background(true);
}
Ok(listener.into())
}

#[pyo3(signature = (handler = None, *, history = None))]
fn declare_link_events_listener(
&self,
py: Python,
handler: Option<&Bound<PyAny>>,
history: Option<bool>,
) -> PyResult<LinkEventsListener> {
let (handler, background) = into_handler(py, handler, None)?;
let builder = build!(self.0.link_events_listener(), history);
let mut listener = wait(py, builder.with(handler))?;
if background {
listener.set_background(true);
}
Ok(listener.into())
}

// TODO __repr__
}

option_wrapper!(
zenoh::session::TransportEventsListener<HandlerImpl<TransportEvent>>,
"Undeclared transport events listener"
);

#[pymethods]
impl TransportEventsListener {
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
this
}

#[pyo3(signature = (*_args, **_kwargs))]
fn __exit__(
&mut self,
py: Python,
_args: &Bound<PyTuple>,
_kwargs: Option<&Bound<PyDict>>,
) -> PyResult<PyObject> {
self.undeclare(py)?;
Ok(py.None())
}

#[getter]
fn handler(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().into_py_any(py)
}

fn try_recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().try_recv(py)
}

fn recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().recv(py)
}

fn undeclare(&mut self, py: Python) -> PyResult<()> {
wait(py, self.take()?.undeclare())
}

fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
self.handler(py)?.bind(py).try_iter()
}
}

option_wrapper!(
zenoh::session::LinkEventsListener<HandlerImpl<LinkEvent>>,
"Undeclared link events listener"
);

#[pymethods]
impl LinkEventsListener {
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
this
}

#[pyo3(signature = (*_args, **_kwargs))]
fn __exit__(
&mut self,
py: Python,
_args: &Bound<PyTuple>,
_kwargs: Option<&Bound<PyDict>>,
) -> PyResult<PyObject> {
self.undeclare(py)?;
Ok(py.None())
}

#[getter]
fn handler(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().into_py_any(py)
}

fn try_recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().try_recv(py)
}

fn recv(&self, py: Python) -> PyResult<PyObject> {
self.get_ref()?.handler().recv(py)
}

fn undeclare(&mut self, py: Python) -> PyResult<()> {
wait(py, self.take()?.undeclare())
}

fn __iter__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyIterator>> {
self.handler(py)?.bind(py).try_iter()
}
}

wrapper!(zenoh::session::EntityGlobalId: Clone);

#[pymethods]
Expand Down
Loading
Loading