[@@@warning "-27-50"] (** Discrete-time node Low-level representation of a discrete node as a current [state] and [step] and [reset] functions. The type parameters represent the step function's inputs and outputs and the reset parameter. *) type ('i, 'o, 'r) dnode = DNode : { state : 's; (** current state *) step : 's -> 'i -> 's * 'o; (** step function *) reset : 's -> 'r -> 's; (** reset function *) } -> ('i, 'o, 'r) dnode (** Run a discrete node on a list of inputs *) let drun (DNode n : ('i, 'o, 'r) dnode) (i : 'i list) : 'o list = List.fold_left_map n.step n.state i |> snd (** Time values Must be positive. *) type time = float (** Interval-defined functions A value [v] of type [α dense] represents a function from [time] to [α] defined on the interval [[0, v.h]]. Calling [v.f] with a value outside these bounds is undefined. For convenience, we assume all functions are well-behaved w.r.t. the numerical methods we use. In particular, if [v.h = 0], then the function is defined at a single instant. *) type 'a dense = { h : time; (** horizon *) f : time -> 'a } (** [f : [0, h] -> α] *) (** Continuous-time signal A value of type [α signal] is either empty or an interval-defined function. A sequence of values of type [α signal] represents the function obtained by the concatenation of the interval domains of each [α value] in the sequence. For instance, [[Some { h=3.0; f=f1 }; None; Some { h=2.0; f=f2 }]] represents the function [{ h=5.0; f=fun t -> if t <= 3.0 then f1 t else f2 (t - 3.0) }]. *) type 'a signal = 'a dense option (** Initial value problem (IVP) Its solution is a function [f : [0, h] -> 'y] such that: - [f 0.0 = y0] - [(df/dt) t = fder t (f t)] *) type ('y, 'yder) ivp = { y0 : 'y; (** initial position *) fder : time -> 'y -> 'yder; (** derivative function *) h : time; } (** maximal horizon *) (** ODE solver Given a requested horizon [t], the solver returns an approximation of the solution to the IVP on [[0, t']] (where [t' ≤ t]). Successive steps compute successive parts of the solution. Its (re)initialization parameter is the IVP to solve. That is, the solver must be initialized with an IVP before use. *) type ('y, 'yder) csolver = (time, 'y dense, ('y, 'yder) ivp) dnode (* ↑ ↑ ↑ *) (* input output reset parameter *) (** Zero-crossing problem (ZCP) Paired with an approximation [f : [0, h] -> 'y], its solution is the least instant [t ∈ [0, h]] such that [fzer t (y t)] crosses zero at that instant, or [h] if no such crossing occurs. *) type ('y, 'zin) zcp = { y0 : 'y; (** initial position *) fzer : time -> 'y -> 'zin; (** zero-crossing function *) h : time; } (** maximal horizon *) (** Zero-crossing solver Given an approximation [f : [0, h] -> 'y], the solver returns an instant [t ∈ [0, h]] solving the ZCP, and an indicator ['zout] of what zero-crossing event occured. *) type ('y, 'zin, 'zout) zsolver = ('y dense, time * 'zout, ('y, 'zin) zcp) dnode (* ↑ ↑ ↑ *) (* input output reset parameter *) (** Full solver Composes an ODE solver with a zero-crossing solver. *) type ('y, 'yder, 'zin, 'zout) solver = (time, 'y dense * 'zout, ('y, 'yder) ivp * ('y, 'zin) zcp) dnode (* ↑ ↑ ↑ *) (* input output reset parameter *) (** Compose an ODE solver and a zero-crossing solver. *) let compose_solvers : ('y, 'yder) csolver -> ('y, 'zin, 'zout) zsolver -> ('y, 'yder, 'zin, 'zout) solver = fun (DNode csolver) (DNode zsolver) -> let state = (csolver.state, zsolver.state) in let step (cstate, zstate) h = let cstate, f = csolver.step cstate h in let zstate, (h, zout) = zsolver.step zstate f in (cstate, zstate), ({ f with h }, zout) in let reset (cstate, zstate) (ivp, zcp) = (csolver.reset cstate ivp, zsolver.reset zstate zcp) in DNode { state; step; reset } (** Hybrid (discrete-time and continuous-time) node A hybrid node contains both a discrete [step] function and a derivative function [fder], zero-crossing function [fzer], and output function [fout], representing continuous behaviour. Its state ['s] contains a continuous part ['y], which evolves during continuous behaviour; functions [cget] and [cset] allow reading of and writing to this continuous part. The zero-crossing function [fzer] returns a vector ['zin] of all the expressions to monitor for zero-crossings. When a zero-crossing event is detected, the state may be updated using the [zset] function. After a discrete step, the model may require another discrete step to be performed before resuming continuous behaviour, which it notifies through the [jump] function. *) type ('i, 'o, 'r, 'y, 'yder, 'zin, 'zout) hnode = HNode : { state : 's; (** current state *) step : 's -> time -> 'i -> 's * 'o; (** discrete step function *) fder : 's -> time -> 'i -> 'y -> 'yder; (** derivative function *) fzer : 's -> time -> 'i -> 'y -> 'zin; (** zero-crossing function *) fout : 's -> time -> 'i -> 'y -> 'o; (** continuous output function *) reset : 's -> 'r -> 's; (** reset function *) cget : 's -> 'y; (** continuous state getter *) cset : 's -> 'y -> 's; (** continuous state setter *) zset : 's -> 'zout -> 's; (** zero-crossing information setter *) jump : 's -> bool; (** discrete go-again function *) } -> ('i, 'o, 'r, 'y, 'yder, 'zin, 'zout) hnode (* We consider the simulation of a hybrid node with a solver as a particular case of a discrete node. That is, the simulation has an internal state, step function and reset function. At each step of the simulation, we operate according to one of two modes: discrete and continuous. In discrete mode, we perform a discrete step using the model's [step] function. Physical time does not advance; and so the output is a function defined at a single instant [0.0]. Additionally, we may choose to reset the solver and switch to continuous mode in the next step, according to the result of the model's [jump] function. In continuous mode, we call the solver to obtain an approximation of the solution to the model's IVP, obtained with its [fder] function. Physical time advances up to the horizon reached by the solver. If a zero-crossing event occurs, we switch to discrete mode for the next step; otherwise we remain in continuous mode. *) (** Simulation mode Either discrete ([D]) or continuous ([C]). *) type mode = D | C (** Simulation state The simulation state must store the states of both the model and solver. Additionally, it must store the current input, physical time, and step mode. *) type ('i, 'o, 'r, 'y) state = State : { solver : ('y, 'yder, 'zin, 'zout) solver; (** solver state *) model : ('i, 'o, 'r, 'y, 'yder, 'zin, 'zout) hnode; (** model state *) input : 'i signal; (** current input *) time : time; (** current time *) mode : mode; (** current step mode *) } -> ('i, 'o, 'r, 'y) state (** Discrete simulation step Performs a discrete step of the model, and resets the solver if required by the model. *) let dstep (State ({ model = HNode m; solver = DNode s; _ } as st)) = let i = Option.get st.input in (* Step the model *) let ms, o = m.step m.state st.time (i.f st.time) in let model = HNode { m with state = ms } in let st = if m.jump ms then (* If the model asks for another discrete step, stay in discrete mode; *) State { st with model } else if st.time >= i.h then (* if we reached the end of the input, remain idle; *) State { st with input = None; model; time = 0. } else (* otherwise, reset the solver and switch to continuous mode. *) let y0 = m.cget ms and h = i.h -. st.time and ofs = (+.) st.time in let ivp = { h; y0; fder = fun t -> m.fder ms (ofs t) (i.f (ofs t)) } in let zcp = { h; y0; fzer = fun t -> m.fzer ms (ofs t) (i.f (ofs t)) } in let solver = DNode { s with state = s.reset s.state (ivp, zcp) } in let input = Some { h = i.h -. st.time; f = fun t -> i.f (ofs t) } in State { model; solver; mode = C; time = 0.; input } in (* Return the state and the output function (defined only at [0.0]). *) st, Some { h = 0.; f = fun _ -> o } (** Continuous simulation step Calls the solver to solve the IVP, and switch to discrete mode if a zero-crossing event occurs or if the model asks for it. *) let cstep (State ({ model = HNode m; solver = DNode s; _ } as st)) = let i = Option.get st.input in (* Step the solver. *) let ss, (y, z) = s.step s.state i.h in let solver = DNode { s with state = ss } in (* Update the model's state. *) let ms = m.zset (m.cset m.state (y.f y.h)) z in let model = HNode { m with state = ms } in (* Create the output function. *) let out = { y with f = fun t -> m.fout ms (st.time +. t) (i.f (st.time +. t)) (y.f t) } in (* Compute the mode for the next step. *) let mode = if m.jump ms || st.time +. y.h >= i.h then D else C in (* Return the state and the output function. *) State { st with model; solver; mode; time = st.time +. y.h }, Some out (** Simulate a hybrid model with a solver The [step] function calls [dstep] or [cstep] depending on the current mode. If the current input is [None], nothing is done; the simulation is awaiting input; and we are allowed to provide a new function as input. If the current input is [Some f], the [step] function expects [None] as input; providing a new input value before the simulation is done with the previous one is an error. *) let hsim : ('i, 'o, 'r, 'y, 'yder, 'zin, 'zout) hnode -> ('y, 'yder, 'zin, 'zout) solver -> ('i signal, 'o signal, 'r) dnode = fun model solver -> let state = State { model; solver; input = None; time = 0.; mode = D } in let step (State s as st) input = match (input, s.input, s.mode) with | (Some _, None, _) -> (* Accept the new input and reset the solver. *) dstep (State { s with input; time = 0.; mode = D }) | (None, Some _, D) -> (* Perform a discrete step on the current input. *) dstep st | None, Some _, C -> (* Perform a continuous step on the current input. *) cstep st | (None, None, _) -> (* Do nothing and wait for the next input. *) (st, None) | (Some _, Some _, _) -> (* Got the next input too early! *) invalid_arg "Not done processing previous input" in let reset (State ({ model = HNode m; _ } as s)) r = (* Reset the model; the solver will reset at the first discrete step. *) let model = HNode { m with state = m.reset m.state r } in State { s with model; input = None; time = 0.; mode = D } in DNode { state; step; reset } (** Run a simulation on a list of inputs For each input value, we step the simulation as many times as needed for it to reach the horizon. *) let hrun (model : ('i, 'o, 'r, 'y, 'yder, 'zin, 'zout) hnode) (solver : ('y, 'yder, 'zin, 'zout) solver) (i : 'i dense list) : 'o dense list = let sim = hsim model solver and i = List.map Option.some i in let rec step os (DNode sim) i = let state, o = sim.step sim.state i in let sim = DNode { sim with state } in if o = None then (sim, List.rev_map Option.get os) else step (o :: os) sim None in List.fold_left_map (step []) sim i |> snd |> List.flatten