hat.util.cron

  1from collections.abc import Collection
  2import datetime
  3import typing
  4
  5
  6class AllSubExpr(typing.NamedTuple):
  7    pass
  8
  9
 10class ValueSubExpr(typing.NamedTuple):
 11    value: int
 12
 13
 14class RangeSubExpr(typing.NamedTuple):
 15    from_: int
 16    to: int
 17
 18
 19class ListSubExpr(typing.NamedTuple):
 20    subexprs: Collection[ValueSubExpr | RangeSubExpr]
 21
 22
 23SubExpr: typing.TypeAlias = (AllSubExpr |
 24                             ValueSubExpr |
 25                             RangeSubExpr |
 26                             ListSubExpr)
 27
 28
 29class Expr(typing.NamedTuple):
 30    minute: SubExpr
 31    hour: SubExpr
 32    day: SubExpr
 33    month: SubExpr
 34    day_of_week: SubExpr
 35
 36
 37def parse(expr_str: str) -> Expr:
 38    subexpr_strs = expr_str.split()
 39    if len(subexpr_strs) != 5:
 40        raise ValueError('invalid number of subexpressions')
 41
 42    return Expr(
 43        minute=_parse_subexpr(subexpr_strs[0], _parse_minute),
 44        hour=_parse_subexpr(subexpr_strs[1], _parse_hour),
 45        day=_parse_subexpr(subexpr_strs[2], _parse_day),
 46        month=_parse_subexpr(subexpr_strs[3], _parse_month),
 47        day_of_week=_parse_subexpr(subexpr_strs[4], _parse_day_of_week))
 48
 49
 50def next(expr: Expr,
 51         t: datetime.datetime
 52         ) -> datetime.datetime:
 53    t = t.replace(second=0, microsecond=0)
 54
 55    while True:
 56        t = t + datetime.timedelta(minutes=1)
 57
 58        if match(expr, t):
 59            return t
 60
 61
 62def match(expr: Expr,
 63          t: datetime.datetime
 64          ) -> bool:
 65    if t.second or t.microsecond:
 66        return False
 67
 68    if not _match_subexpr(expr.minute, t.minute):
 69        return False
 70
 71    if not _match_subexpr(expr.hour, t.hour):
 72        return False
 73
 74    if not _match_subexpr(expr.day, t.day):
 75        return False
 76
 77    if not _match_subexpr(expr.month, t.month):
 78        return False
 79
 80    if not _match_subexpr(expr.day_of_week, t.isoweekday() % 7):
 81        return False
 82
 83    return True
 84
 85
 86def _parse_subexpr(subexpr_str, value_parser):
 87    if subexpr_str == '*':
 88        return AllSubExpr()
 89
 90    if ',' in subexpr_str:
 91        return ListSubExpr([_parse_subexpr(i, value_parser)
 92                            for i in subexpr_str.split(',')])
 93
 94    if '-' in subexpr_str:
 95        from_str, to_str = subexpr_str.split('-')
 96        return RangeSubExpr(value_parser(from_str), value_parser(to_str))
 97
 98    return ValueSubExpr(value_parser(subexpr_str))
 99
100
101def _match_subexpr(subexpr, value):
102    if isinstance(subexpr, AllSubExpr):
103        return True
104
105    if isinstance(subexpr, ValueSubExpr):
106        return value == subexpr.value
107
108    if isinstance(subexpr, RangeSubExpr):
109        return subexpr.from_ <= value <= subexpr.to
110
111    if isinstance(subexpr, ListSubExpr):
112        return any(_match_subexpr(i, value) for i in subexpr.subexprs)
113
114    raise ValueError('unsupported subexpression')
115
116
117def _parse_minute(value_str):
118    value = int(value_str)
119    if not (0 <= value <= 59):
120        raise ValueError('invalid minute value')
121
122    return value
123
124
125def _parse_hour(value_str):
126    value = int(value_str)
127    if not (0 <= value <= 23):
128        raise ValueError('invalid hour value')
129
130    return value
131
132
133def _parse_day(value_str):
134    value = int(value_str)
135    if not (1 <= value <= 31):
136        raise ValueError('invalid day value')
137
138    return value
139
140
141def _parse_month(value_str):
142    value = int(value_str)
143    if not (1 <= value <= 12):
144        raise ValueError('invalid month value')
145
146    return value
147
148
149def _parse_day_of_week(value_str):
150    value = int(value_str)
151    if not (0 <= value <= 6):
152        raise ValueError('invalid day of week value')
153
154    return value
class AllSubExpr(typing.NamedTuple):
7class AllSubExpr(typing.NamedTuple):
8    pass

AllSubExpr()

AllSubExpr()

Create new instance of AllSubExpr()

class ValueSubExpr(typing.NamedTuple):
11class ValueSubExpr(typing.NamedTuple):
12    value: int

ValueSubExpr(value,)

ValueSubExpr(value: int)

Create new instance of ValueSubExpr(value,)

value: int

Alias for field number 0

class RangeSubExpr(typing.NamedTuple):
15class RangeSubExpr(typing.NamedTuple):
16    from_: int
17    to: int

RangeSubExpr(from_, to)

RangeSubExpr(from_: int, to: int)

Create new instance of RangeSubExpr(from_, to)

from_: int

Alias for field number 0

to: int

Alias for field number 1

class ListSubExpr(typing.NamedTuple):
20class ListSubExpr(typing.NamedTuple):
21    subexprs: Collection[ValueSubExpr | RangeSubExpr]

ListSubExpr(subexprs,)

ListSubExpr( subexprs: Collection[ValueSubExpr | RangeSubExpr])

Create new instance of ListSubExpr(subexprs,)

subexprs: Collection[ValueSubExpr | RangeSubExpr]

Alias for field number 0

SubExpr: TypeAlias = AllSubExpr | ValueSubExpr | RangeSubExpr | ListSubExpr
class Expr(typing.NamedTuple):
30class Expr(typing.NamedTuple):
31    minute: SubExpr
32    hour: SubExpr
33    day: SubExpr
34    month: SubExpr
35    day_of_week: SubExpr

Expr(minute, hour, day, month, day_of_week)

Create new instance of Expr(minute, hour, day, month, day_of_week)

Alias for field number 0

Alias for field number 1

Alias for field number 2

Alias for field number 3

Alias for field number 4

def parse(expr_str: str) -> Expr:
38def parse(expr_str: str) -> Expr:
39    subexpr_strs = expr_str.split()
40    if len(subexpr_strs) != 5:
41        raise ValueError('invalid number of subexpressions')
42
43    return Expr(
44        minute=_parse_subexpr(subexpr_strs[0], _parse_minute),
45        hour=_parse_subexpr(subexpr_strs[1], _parse_hour),
46        day=_parse_subexpr(subexpr_strs[2], _parse_day),
47        month=_parse_subexpr(subexpr_strs[3], _parse_month),
48        day_of_week=_parse_subexpr(subexpr_strs[4], _parse_day_of_week))
def next(expr: Expr, t: datetime.datetime) -> datetime.datetime:
51def next(expr: Expr,
52         t: datetime.datetime
53         ) -> datetime.datetime:
54    t = t.replace(second=0, microsecond=0)
55
56    while True:
57        t = t + datetime.timedelta(minutes=1)
58
59        if match(expr, t):
60            return t
def match(expr: Expr, t: datetime.datetime) -> bool:
63def match(expr: Expr,
64          t: datetime.datetime
65          ) -> bool:
66    if t.second or t.microsecond:
67        return False
68
69    if not _match_subexpr(expr.minute, t.minute):
70        return False
71
72    if not _match_subexpr(expr.hour, t.hour):
73        return False
74
75    if not _match_subexpr(expr.day, t.day):
76        return False
77
78    if not _match_subexpr(expr.month, t.month):
79        return False
80
81    if not _match_subexpr(expr.day_of_week, t.isoweekday() % 7):
82        return False
83
84    return True