hat.util

Common utility functions

  1"""Common utility functions"""
  2
  3import contextlib
  4import inspect
  5import socket
  6import typing
  7
  8
  9T = typing.TypeVar('T')
 10
 11
 12def register_type_alias(name: str):
 13    """Register type alias
 14
 15    This function is temporary hack replacement for typing.TypeAlias.
 16
 17    It is expected that calling location will have `name` in local namespace
 18    with type value. This function will wrap that type inside `typing.TypeVar`
 19    and update annotations.
 20
 21    """
 22    frame = inspect.stack()[1][0]
 23    f_locals = frame.f_locals
 24    t = f_locals[name]
 25    f_locals[name] = typing.TypeVar(name, t, t)
 26    f_locals.setdefault('__annotations__', {})[name] = typing.Type[t]
 27
 28
 29def first(xs: typing.Iterable[T],
 30          fn: typing.Callable[[T], typing.Any] = lambda _: True,
 31          default: typing.Optional[T] = None
 32          ) -> typing.Optional[T]:
 33    """Return the first element from iterable that satisfies predicate `fn`,
 34    or `default` if no such element exists.
 35
 36    Result of predicate `fn` can be of any type. Predicate is satisfied if it's
 37    return value is truthy.
 38
 39    Args:
 40        xs: collection
 41        fn: predicate
 42        default: default value
 43
 44    Example::
 45
 46        assert first(range(3)) == 0
 47        assert first(range(3), lambda x: x > 1) == 2
 48        assert first(range(3), lambda x: x > 2) is None
 49        assert first(range(3), lambda x: x > 2, 123) == 123
 50        assert first({1: 'a', 2: 'b', 3: 'c'}) == 1
 51        assert first([], default=123) == 123
 52
 53    """
 54    return next((i for i in xs if fn(i)), default)
 55
 56
 57class RegisterCallbackHandle(typing.NamedTuple):
 58    """Handle for canceling callback registration."""
 59
 60    cancel: typing.Callable[[], None]
 61    """cancel callback registration"""
 62
 63    def __enter__(self):
 64        return self
 65
 66    def __exit__(self, *args):
 67        self.cancel()
 68
 69
 70ExceptionCb = typing.Callable[[Exception], None]
 71"""Exception callback"""
 72
 73
 74class CallbackRegistry:
 75    """Registry that enables callback registration and notification.
 76
 77    Callbacks in the registry are notified sequentially with
 78    `CallbackRegistry.notify`. If a callback raises an exception, the
 79    exception is caught and `exception_cb` handler is called. Notification of
 80    subsequent callbacks is not interrupted. If handler is `None`, the
 81    exception is reraised and no subsequent callback is notified.
 82
 83    Example::
 84
 85        x = []
 86        y = []
 87        registry = CallbackRegistry()
 88
 89        registry.register(x.append)
 90        registry.notify(1)
 91
 92        with registry.register(y.append):
 93            registry.notify(2)
 94
 95        registry.notify(3)
 96
 97        assert x == [1, 2, 3]
 98        assert y == [2]
 99
100    """
101
102    def __init__(self,
103                 exception_cb: typing.Optional[ExceptionCb] = None):
104        self._exception_cb = exception_cb
105        self._cbs = []  # type: typing.List[typing.Callable]
106
107    def register(self,
108                 cb: typing.Callable
109                 ) -> RegisterCallbackHandle:
110        """Register a callback."""
111        self._cbs.append(cb)
112        return RegisterCallbackHandle(lambda: self._cbs.remove(cb))
113
114    def notify(self, *args, **kwargs):
115        """Notify all registered callbacks."""
116        for cb in self._cbs:
117            try:
118                cb(*args, **kwargs)
119            except Exception as e:
120                if self._exception_cb:
121                    self._exception_cb(e)
122                else:
123                    raise
124
125
126def get_unused_tcp_port(host: str = '127.0.0.1') -> int:
127    """Search for unused TCP port"""
128    with contextlib.closing(socket.socket()) as sock:
129        sock.bind((host, 0))
130        return sock.getsockname()[1]
131
132
133def get_unused_udp_port(host: str = '127.0.0.1') -> int:
134    """Search for unused UDP port"""
135    with contextlib.closing(socket.socket(type=socket.SOCK_DGRAM)) as sock:
136        sock.bind((host, 0))
137        return sock.getsockname()[1]
138
139
140# HACK type alias
141register_type_alias('ExceptionCb')
def register_type_alias(name: str):
13def register_type_alias(name: str):
14    """Register type alias
15
16    This function is temporary hack replacement for typing.TypeAlias.
17
18    It is expected that calling location will have `name` in local namespace
19    with type value. This function will wrap that type inside `typing.TypeVar`
20    and update annotations.
21
22    """
23    frame = inspect.stack()[1][0]
24    f_locals = frame.f_locals
25    t = f_locals[name]
26    f_locals[name] = typing.TypeVar(name, t, t)
27    f_locals.setdefault('__annotations__', {})[name] = typing.Type[t]

Register type alias

This function is temporary hack replacement for typing.TypeAlias.

It is expected that calling location will have name in local namespace with type value. This function will wrap that type inside typing.TypeVar and update annotations.

def first( xs: Iterable[~T], fn: Callable[[~T], Any] = <function <lambda>>, default: Optional[~T] = None) -> Optional[~T]:
30def first(xs: typing.Iterable[T],
31          fn: typing.Callable[[T], typing.Any] = lambda _: True,
32          default: typing.Optional[T] = None
33          ) -> typing.Optional[T]:
34    """Return the first element from iterable that satisfies predicate `fn`,
35    or `default` if no such element exists.
36
37    Result of predicate `fn` can be of any type. Predicate is satisfied if it's
38    return value is truthy.
39
40    Args:
41        xs: collection
42        fn: predicate
43        default: default value
44
45    Example::
46
47        assert first(range(3)) == 0
48        assert first(range(3), lambda x: x > 1) == 2
49        assert first(range(3), lambda x: x > 2) is None
50        assert first(range(3), lambda x: x > 2, 123) == 123
51        assert first({1: 'a', 2: 'b', 3: 'c'}) == 1
52        assert first([], default=123) == 123
53
54    """
55    return next((i for i in xs if fn(i)), default)

Return the first element from iterable that satisfies predicate fn, or default if no such element exists.

Result of predicate fn can be of any type. Predicate is satisfied if it's return value is truthy.

Arguments:
  • xs: collection
  • fn: predicate
  • default: default value

Example::

assert first(range(3)) == 0
assert first(range(3), lambda x: x > 1) == 2
assert first(range(3), lambda x: x > 2) is None
assert first(range(3), lambda x: x > 2, 123) == 123
assert first({1: 'a', 2: 'b', 3: 'c'}) == 1
assert first([], default=123) == 123
class RegisterCallbackHandle(typing.NamedTuple):
58class RegisterCallbackHandle(typing.NamedTuple):
59    """Handle for canceling callback registration."""
60
61    cancel: typing.Callable[[], None]
62    """cancel callback registration"""
63
64    def __enter__(self):
65        return self
66
67    def __exit__(self, *args):
68        self.cancel()

Handle for canceling callback registration.

RegisterCallbackHandle(cancel: Callable[[], NoneType])

Create new instance of RegisterCallbackHandle(cancel,)

cancel: Callable[[], NoneType]

cancel callback registration

Inherited Members
builtins.tuple
index
count
ExceptionCb: Type[Callable[[Exception], NoneType]] = ~ExceptionCb

Exception callback

class CallbackRegistry:
 75class CallbackRegistry:
 76    """Registry that enables callback registration and notification.
 77
 78    Callbacks in the registry are notified sequentially with
 79    `CallbackRegistry.notify`. If a callback raises an exception, the
 80    exception is caught and `exception_cb` handler is called. Notification of
 81    subsequent callbacks is not interrupted. If handler is `None`, the
 82    exception is reraised and no subsequent callback is notified.
 83
 84    Example::
 85
 86        x = []
 87        y = []
 88        registry = CallbackRegistry()
 89
 90        registry.register(x.append)
 91        registry.notify(1)
 92
 93        with registry.register(y.append):
 94            registry.notify(2)
 95
 96        registry.notify(3)
 97
 98        assert x == [1, 2, 3]
 99        assert y == [2]
100
101    """
102
103    def __init__(self,
104                 exception_cb: typing.Optional[ExceptionCb] = None):
105        self._exception_cb = exception_cb
106        self._cbs = []  # type: typing.List[typing.Callable]
107
108    def register(self,
109                 cb: typing.Callable
110                 ) -> RegisterCallbackHandle:
111        """Register a callback."""
112        self._cbs.append(cb)
113        return RegisterCallbackHandle(lambda: self._cbs.remove(cb))
114
115    def notify(self, *args, **kwargs):
116        """Notify all registered callbacks."""
117        for cb in self._cbs:
118            try:
119                cb(*args, **kwargs)
120            except Exception as e:
121                if self._exception_cb:
122                    self._exception_cb(e)
123                else:
124                    raise

Registry that enables callback registration and notification.

Callbacks in the registry are notified sequentially with CallbackRegistry.notify. If a callback raises an exception, the exception is caught and exception_cb handler is called. Notification of subsequent callbacks is not interrupted. If handler is None, the exception is reraised and no subsequent callback is notified.

Example::

x = []
y = []
registry = CallbackRegistry()

registry.register(x.append)
registry.notify(1)

with registry.register(y.append):
    registry.notify(2)

registry.notify(3)

assert x == [1, 2, 3]
assert y == [2]
CallbackRegistry(exception_cb: Optional[Callable[[Exception], NoneType]] = None)
103    def __init__(self,
104                 exception_cb: typing.Optional[ExceptionCb] = None):
105        self._exception_cb = exception_cb
106        self._cbs = []  # type: typing.List[typing.Callable]
def register(self, cb: Callable) -> hat.util.RegisterCallbackHandle:
108    def register(self,
109                 cb: typing.Callable
110                 ) -> RegisterCallbackHandle:
111        """Register a callback."""
112        self._cbs.append(cb)
113        return RegisterCallbackHandle(lambda: self._cbs.remove(cb))

Register a callback.

def notify(self, *args, **kwargs):
115    def notify(self, *args, **kwargs):
116        """Notify all registered callbacks."""
117        for cb in self._cbs:
118            try:
119                cb(*args, **kwargs)
120            except Exception as e:
121                if self._exception_cb:
122                    self._exception_cb(e)
123                else:
124                    raise

Notify all registered callbacks.

def get_unused_tcp_port(host: str = '127.0.0.1') -> int:
127def get_unused_tcp_port(host: str = '127.0.0.1') -> int:
128    """Search for unused TCP port"""
129    with contextlib.closing(socket.socket()) as sock:
130        sock.bind((host, 0))
131        return sock.getsockname()[1]

Search for unused TCP port

def get_unused_udp_port(host: str = '127.0.0.1') -> int:
134def get_unused_udp_port(host: str = '127.0.0.1') -> int:
135    """Search for unused UDP port"""
136    with contextlib.closing(socket.socket(type=socket.SOCK_DGRAM)) as sock:
137        sock.bind((host, 0))
138        return sock.getsockname()[1]

Search for unused UDP port