refactor: handle DAP events in editor main loop

pull/574/head
Dmitry Sharshakov 3 years ago
parent 3fc501c99f
commit a938f5a87a
No known key found for this signature in database
GPG Key ID: 471FD32E15FD8473

1
Cargo.lock generated

@ -389,6 +389,7 @@ dependencies = [
"signal-hook", "signal-hook",
"signal-hook-tokio", "signal-hook-tokio",
"tokio", "tokio",
"tokio-stream",
"toml", "toml",
] ]

@ -1,5 +1,5 @@
use crate::{ use crate::{
transport::{Event, Payload, Request, Transport}, transport::{Payload, Request, Transport},
types::*, types::*,
Result, Result,
}; };
@ -9,19 +9,13 @@ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
path::PathBuf, path::PathBuf,
process::Stdio, process::Stdio,
sync::{ sync::atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, Ordering},
Arc,
},
}; };
use tokio::{ use tokio::{
io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter}, io::{AsyncBufRead, AsyncWrite, BufReader, BufWriter},
net::TcpStream, net::TcpStream,
process::{Child, Command}, process::{Child, Command},
sync::{ sync::mpsc::{channel, unbounded_channel, UnboundedReceiver, UnboundedSender},
mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
Mutex,
},
time, time,
}; };
@ -32,8 +26,6 @@ pub struct Client {
server_tx: UnboundedSender<Request>, server_tx: UnboundedSender<Request>,
request_counter: AtomicU64, request_counter: AtomicU64,
capabilities: Option<DebuggerCapabilities>, capabilities: Option<DebuggerCapabilities>,
awaited_events: Arc<Mutex<HashMap<String, Sender<Event>>>>,
// //
pub breakpoints: HashMap<PathBuf, Vec<SourceBreakpoint>>, pub breakpoints: HashMap<PathBuf, Vec<SourceBreakpoint>>,
// TODO: multiple threads support // TODO: multiple threads support
@ -46,8 +38,9 @@ impl Client {
tx: Box<dyn AsyncWrite + Unpin + Send>, tx: Box<dyn AsyncWrite + Unpin + Send>,
id: usize, id: usize,
process: Option<Child>, process: Option<Child>,
) -> Result<Self> { ) -> Result<(Self, UnboundedReceiver<Payload>)> {
let (server_rx, server_tx) = Transport::start(rx, tx, id); let (server_rx, server_tx) = Transport::start(rx, tx, id);
let (client_rx, client_tx) = unbounded_channel();
let client = Self { let client = Self {
id, id,
@ -55,24 +48,30 @@ impl Client {
server_tx, server_tx,
request_counter: AtomicU64::new(0), request_counter: AtomicU64::new(0),
capabilities: None, capabilities: None,
awaited_events: Arc::new(Mutex::new(HashMap::default())),
// //
breakpoints: HashMap::new(), breakpoints: HashMap::new(),
stack_pointer: None, stack_pointer: None,
}; };
tokio::spawn(Self::recv(Arc::clone(&client.awaited_events), server_rx)); tokio::spawn(Self::recv(server_rx, client_rx));
Ok(client) Ok((client, client_tx))
} }
pub async fn tcp(addr: std::net::SocketAddr, id: usize) -> Result<Self> { pub async fn tcp(
addr: std::net::SocketAddr,
id: usize,
) -> Result<(Self, UnboundedReceiver<Payload>)> {
let stream = TcpStream::connect(addr).await?; let stream = TcpStream::connect(addr).await?;
let (rx, tx) = stream.into_split(); let (rx, tx) = stream.into_split();
Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None) Self::streams(Box::new(BufReader::new(rx)), Box::new(tx), id, None)
} }
pub fn stdio(cmd: &str, args: Vec<&str>, id: usize) -> Result<Self> { pub fn stdio(
cmd: &str,
args: Vec<&str>,
id: usize,
) -> Result<(Self, UnboundedReceiver<Payload>)> {
let process = Command::new(cmd) let process = Command::new(cmd)
.args(args) .args(args)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
@ -114,7 +113,7 @@ impl Client {
args: Vec<&str>, args: Vec<&str>,
port_format: &str, port_format: &str,
id: usize, id: usize,
) -> Result<Self> { ) -> Result<(Self, UnboundedReceiver<Payload>)> {
let port = Self::get_port().await.unwrap(); let port = Self::get_port().await.unwrap();
let process = Command::new(cmd) let process = Command::new(cmd)
@ -145,41 +144,20 @@ impl Client {
) )
} }
async fn recv( async fn recv(mut server_rx: UnboundedReceiver<Payload>, client_tx: UnboundedSender<Payload>) {
awaited_events: Arc<Mutex<HashMap<String, Sender<Event>>>>,
mut server_rx: UnboundedReceiver<Payload>,
) {
while let Some(msg) = server_rx.recv().await { while let Some(msg) = server_rx.recv().await {
match msg { match msg {
Payload::Event(ev) => { Payload::Event(ev) => {
let name = ev.event.clone(); client_tx.send(Payload::Event(ev)).expect("Failed to send");
let hashmap = awaited_events.lock().await;
let tx = hashmap.get(&name);
match tx {
Some(tx) => match tx.send(ev).await {
Ok(_) => (),
Err(_) => error!(
"Tried sending event into a closed channel (name={:?})",
name
),
},
None => {
info!("unhandled event {}", name);
// client_tx.send(Payload::Event(ev)).expect("Failed to send");
}
}
} }
Payload::Response(_) => unreachable!(), Payload::Response(_) => unreachable!(),
Payload::Request(_) => todo!(), Payload::Request(req) => {
client_tx
.send(Payload::Request(req))
.expect("Failed to send");
} }
} }
} }
pub async fn listen_for_event(&self, name: String) -> Receiver<Event> {
let (rx, tx) = channel(1);
self.awaited_events.lock().await.insert(name.clone(), rx);
tx
} }
pub fn id(&self) -> usize { pub fn id(&self) -> usize {
@ -248,8 +226,6 @@ impl Client {
} }
pub async fn launch(&mut self, args: serde_json::Value) -> Result<()> { pub async fn launch(&mut self, args: serde_json::Value) -> Result<()> {
// TODO: buffer these until initialized arrives
let response = self.request::<requests::Launch>(args).await?; let response = self.request::<requests::Launch>(args).await?;
log::error!("launch response {}", response); log::error!("launch response {}", response);
@ -257,8 +233,6 @@ impl Client {
} }
pub async fn attach(&mut self, args: serde_json::Value) -> Result<()> { pub async fn attach(&mut self, args: serde_json::Value) -> Result<()> {
// TODO: buffer these until initialized arrives
let response = self.request::<requests::Attach>(args).await?; let response = self.request::<requests::Attach>(args).await?;
log::error!("attach response {}", response); log::error!("attach response {}", response);

@ -34,7 +34,7 @@ num_cpus = "1"
tui = { path = "../helix-tui", package = "helix-tui", default-features = false, features = ["crossterm"] } tui = { path = "../helix-tui", package = "helix-tui", default-features = false, features = ["crossterm"] }
crossterm = { version = "0.20", features = ["event-stream"] } crossterm = { version = "0.20", features = ["event-stream"] }
signal-hook = "0.3" signal-hook = "0.3"
tokio-stream = "0.1"
futures-util = { version = "0.3", features = ["std", "async-await"], default-features = false } futures-util = { version = "0.3", features = ["std", "async-await"], default-features = false }
# Logging # Logging

@ -1,4 +1,5 @@
use helix_core::syntax; use helix_core::syntax;
use helix_dap::Payload;
use helix_lsp::{lsp, util::lsp_pos_to_pos, LspProgressMap}; use helix_lsp::{lsp, util::lsp_pos_to_pos, LspProgressMap};
use helix_view::{theme, Editor}; use helix_view::{theme, Editor};
@ -184,6 +185,29 @@ impl Application {
last_render = Instant::now(); last_render = Instant::now();
} }
} }
Some(payload) = self.editor.debugger_events.next() => {
let mut debugger = self.editor.debugger.as_mut().unwrap();
match payload {
Payload::Event(ev) => {
match &ev.event[..] {
"stopped" => {
let main = debugger
.threads()
.await
.ok()
.and_then(|threads| threads.get(0).cloned());
if let Some(main) = main {
let (bt, _) = debugger.stack_trace(main.id).await.unwrap();
debugger.stack_pointer = bt.get(0).cloned();
}
}
_ => {}
}
},
Payload::Response(_) => unreachable!(),
Payload::Request(_) => todo!(),
}
}
Some(callback) = self.jobs.futures.next() => { Some(callback) = self.jobs.futures.next() => {
self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback); self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback);
self.render(); self.render();

@ -24,17 +24,17 @@ use helix_lsp::{
}; };
use insert::*; use insert::*;
use movement::Movement; use movement::Movement;
use tokio::sync::{mpsc::Receiver, Mutex};
use crate::{ use crate::{
compositor::{self, Component, Compositor}, compositor::{self, Component, Compositor},
ui::{self, FilePicker, Picker, Popup, Prompt, PromptEvent}, ui::{self, FilePicker, Picker, Popup, Prompt, PromptEvent},
}; };
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::job::{self, Job, Jobs}; use crate::job::{self, Job, Jobs};
use futures_util::FutureExt; use futures_util::FutureExt;
use std::num::NonZeroUsize;
use std::{collections::HashMap, fmt, future::Future}; use std::{collections::HashMap, fmt, future::Future};
use std::{num::NonZeroUsize, sync::Arc};
use std::{ use std::{
borrow::Cow, borrow::Cow,
@ -4255,8 +4255,8 @@ fn dap_start(cx: &mut Context) {
// look up config for filetype // look up config for filetype
// if multiple available, open picker // if multiple available, open picker
let debugger = Client::tcp_process("dlv", vec!["dap"], "-l 127.0.0.1:{}", 0); let started = Client::tcp_process("dlv", vec!["dap"], "-l 127.0.0.1:{}", 0);
let mut debugger = block_on(debugger).unwrap(); let (mut debugger, events) = block_on(started).unwrap();
let request = debugger.initialize("go".to_owned()); let request = debugger.initialize("go".to_owned());
let _ = block_on(request).unwrap(); let _ = block_on(request).unwrap();
@ -4268,32 +4268,10 @@ fn dap_start(cx: &mut Context) {
let request = debugger.launch(to_value(args).unwrap()); let request = debugger.launch(to_value(args).unwrap());
let _ = block_on(request).unwrap(); let _ = block_on(request).unwrap();
let stopped = block_on(debugger.listen_for_event("stopped".to_owned()));
let debugger = Arc::new(Mutex::new(debugger));
tokio::spawn(dap_listen_stopped(stopped, Arc::clone(&debugger)));
// TODO: either await "initialized" or buffer commands until event is received // TODO: either await "initialized" or buffer commands until event is received
cx.editor.debugger = Some(debugger); cx.editor.debugger = Some(debugger);
} let stream = UnboundedReceiverStream::new(events);
cx.editor.debugger_events.push(stream);
async fn dap_listen_stopped(
mut stopped: Receiver<helix_dap::Event>,
debugger: Arc<Mutex<helix_dap::Client>>,
) {
loop {
stopped.recv().await;
let mut dbg = debugger.lock().await;
let main = dbg
.threads()
.await
.ok()
.and_then(|threads| threads.get(0).cloned());
if let Some(main) = main {
let (a, _) = dbg.stack_trace(main.id).await.unwrap();
dbg.stack_pointer = a.get(0).cloned();
}
}
} }
fn dap_toggle_breakpoint(cx: &mut Context) { fn dap_toggle_breakpoint(cx: &mut Context) {
@ -4321,7 +4299,6 @@ fn dap_toggle_breakpoint(cx: &mut Context) {
// we shouldn't really allow editing while debug is running though // we shouldn't really allow editing while debug is running though
if let Some(debugger) = &mut cx.editor.debugger { if let Some(debugger) = &mut cx.editor.debugger {
let mut debugger = block_on(debugger.lock());
let breakpoints = debugger.breakpoints.entry(path.clone()).or_default(); let breakpoints = debugger.breakpoints.entry(path.clone()).or_default();
if let Some(pos) = breakpoints.iter().position(|b| b.line == breakpoint.line) { if let Some(pos) = breakpoints.iter().position(|b| b.line == breakpoint.line) {
breakpoints.remove(pos); breakpoints.remove(pos);
@ -4340,7 +4317,6 @@ fn dap_run(cx: &mut Context) {
use helix_lsp::block_on; use helix_lsp::block_on;
if let Some(debugger) = &mut cx.editor.debugger { if let Some(debugger) = &mut cx.editor.debugger {
let mut debugger = block_on(debugger.lock());
let request = debugger.configuration_done(); let request = debugger.configuration_done();
let _ = block_on(request).unwrap(); let _ = block_on(request).unwrap();
} }

@ -25,8 +25,7 @@ use helix_view::{
keyboard::{KeyCode, KeyModifiers}, keyboard::{KeyCode, KeyModifiers},
Document, Editor, Theme, View, Document, Editor, Theme, View,
}; };
use std::{borrow::Cow, sync::Arc}; use std::borrow::Cow;
use tokio::sync::Mutex;
use crossterm::event::{Event, MouseButton, MouseEvent, MouseEventKind}; use crossterm::event::{Event, MouseButton, MouseEvent, MouseEventKind};
use tui::buffer::Buffer as Surface; use tui::buffer::Buffer as Surface;
@ -73,7 +72,7 @@ impl EditorView {
is_focused: bool, is_focused: bool,
loader: &syntax::Loader, loader: &syntax::Loader,
config: &helix_view::editor::Config, config: &helix_view::editor::Config,
debugger: Option<Arc<Mutex<helix_dap::Client>>>, debugger: &Option<helix_dap::Client>,
) { ) {
let inner = view.inner_area(); let inner = view.inner_area();
let area = view.area; let area = view.area;
@ -414,9 +413,8 @@ impl EditorView {
theme: &Theme, theme: &Theme,
is_focused: bool, is_focused: bool,
config: &helix_view::editor::Config, config: &helix_view::editor::Config,
debugger: Option<Arc<Mutex<helix_dap::Client>>>, debugger: &Option<helix_dap::Client>,
) { ) {
use helix_lsp::block_on;
let text = doc.text().slice(..); let text = doc.text().slice(..);
let last_line = view.last_line(doc); let last_line = view.last_line(doc);
@ -449,9 +447,8 @@ impl EditorView {
let mut stack_pointer: Option<StackFrame> = None; let mut stack_pointer: Option<StackFrame> = None;
if let Some(debugger) = debugger { if let Some(debugger) = debugger {
if let Some(path) = doc.path() { if let Some(path) = doc.path() {
let dbg = block_on(debugger.lock()); breakpoints = debugger.breakpoints.get(path).cloned();
breakpoints = dbg.breakpoints.get(path).cloned(); stack_pointer = debugger.stack_pointer.clone()
stack_pointer = dbg.stack_pointer.clone()
} }
} }
@ -1047,10 +1044,6 @@ impl Component for EditorView {
for (view, is_focused) in cx.editor.tree.views() { for (view, is_focused) in cx.editor.tree.views() {
let doc = cx.editor.document(view.doc).unwrap(); let doc = cx.editor.document(view.doc).unwrap();
let loader = &cx.editor.syn_loader; let loader = &cx.editor.syn_loader;
let mut dbg: Option<Arc<Mutex<helix_dap::Client>>> = None;
if let Some(debugger) = &cx.editor.debugger {
dbg = Some(Arc::clone(debugger));
}
self.render_view( self.render_view(
doc, doc,
view, view,
@ -1060,7 +1053,7 @@ impl Component for EditorView {
is_focused, is_focused,
loader, loader,
&cx.editor.config, &cx.editor.config,
dbg, &cx.editor.debugger,
); );
} }

@ -8,7 +8,6 @@ use crate::{
use futures_util::future; use futures_util::future;
use futures_util::stream::select_all::SelectAll; use futures_util::stream::select_all::SelectAll;
use tokio::sync::Mutex;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use std::{ use std::{
@ -75,8 +74,8 @@ pub struct Editor {
pub theme: Theme, pub theme: Theme,
pub language_servers: helix_lsp::Registry, pub language_servers: helix_lsp::Registry,
pub debugger: Option<Arc<Mutex<helix_dap::Client>>>, pub debugger: Option<helix_dap::Client>,
pub debuggers: SelectAll<UnboundedReceiverStream<helix_dap::Payload>>, pub debugger_events: SelectAll<UnboundedReceiverStream<helix_dap::Payload>>,
pub clipboard_provider: Box<dyn ClipboardProvider>, pub clipboard_provider: Box<dyn ClipboardProvider>,
@ -116,7 +115,7 @@ impl Editor {
theme: themes.default(), theme: themes.default(),
language_servers, language_servers,
debugger: None, debugger: None,
debuggers: SelectAll::new(), debugger_events: SelectAll::new(),
syn_loader: config_loader, syn_loader: config_loader,
theme_loader: themes, theme_loader: themes,
registers: Registers::default(), registers: Registers::default(),

Loading…
Cancel
Save