use single event loop to control logic

pull/5/head v0.1.2-alpha
Patrick Cleavelin 2023-02-20 19:12:57 -06:00
parent 0e6bc2d3dd
commit 541d0fd70e
2 changed files with 155 additions and 91 deletions

View File

@ -35,7 +35,7 @@
packages = with pkgs; flake-utils.lib.flattenTree rec {
default = rustPlatform.buildRustPackage rec {
name = "memejoin-rs";
version = "0.1.1-alpha";
version = "0.1.2-alpha";
src = self;
nativeBuildInputs = [ local-rust cmake gcc libopus ];
@ -46,7 +46,7 @@
docker = dockerTools.buildImage {
name = "memejoin-rs";
tag = "0.1.1.alpha";
tag = "0.1.2-alpha";
copyToRoot = buildEnv {
name = "image-root";
paths = [ default ffmpeg libopus youtube-dl ];

View File

@ -2,23 +2,33 @@
#![feature(proc_macro_hygiene)]
#![feature(async_closure)]
use songbird::tracks::TrackQueue;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tokio::sync::mpsc;
use serde::Deserialize;
use serenity::async_trait;
use serenity::model::prelude::{Channel, GuildId, Ready};
use serenity::model::prelude::{Channel, ChannelId, GuildId, Member, Ready};
use serenity::model::voice::VoiceState;
use serenity::prelude::GatewayIntents;
use serenity::prelude::*;
use songbird::SerenityInit;
use tracing::*;
struct Handler;
enum HandlerMessage {
Ready(Context),
PlaySound(Context, Member, ChannelId),
TrackEnded(GuildId),
}
struct Handler {
tx: std::sync::Mutex<mpsc::Sender<HandlerMessage>>,
}
struct TrackEventHandler {
ctx: Context,
tx: mpsc::Sender<HandlerMessage>,
guild_id: GuildId,
}
@ -28,36 +38,35 @@ impl songbird::EventHandler for TrackEventHandler {
&'a self,
ctx: &'b songbird::EventContext<'c>,
) -> Option<songbird::Event> {
if let songbird::EventContext::Track(tracks) = ctx {
for (track_state, _track_handle) in tracks.iter() {
// If any track is still playing, don't leave the channel yet
// there are more sounds to be played (I think)
if track_state.playing == songbird::tracks::PlayMode::Play {
return None;
}
if let songbird::EventContext::Track(_) = ctx {
if let Err(err) = self
.tx
.send(HandlerMessage::TrackEnded(self.guild_id))
.await
{
error!("Failed to send track end message to handler: {err}");
}
}
let manager = songbird::get(&self.ctx).await.expect("should get manager");
if let Err(err) = manager.leave(self.guild_id).await {
error!("Failed to leave voice channel: {err:?}");
}
None
}
}
#[derive(Debug, Clone, Deserialize)]
struct Settings {
guilds: HashMap<u64, GuildSettings>,
}
impl TypeMapKey for Settings {
type Value = Arc<Settings>;
}
#[derive(Debug, Clone, Deserialize)]
struct GuildSettings {
#[serde(alias = "userEnteredSoundDelay")]
_sound_delay: u64,
channels: HashMap<String, ChannelSettings>,
}
impl TypeMapKey for Settings {
type Value = Arc<Settings>;
}
#[derive(Debug, Clone, Deserialize)]
struct ChannelSettings {
#[serde(alias = "enterUsers")]
@ -85,7 +94,17 @@ enum SoundType {
#[async_trait]
impl EventHandler for Handler {
async fn ready(&self, _ctx: Context, ready: Ready) {
async fn ready(&self, ctx: Context, ready: Ready) {
let tx = self
.tx
.lock()
.expect("failed to get message sender lock")
.clone();
tx.send(HandlerMessage::Ready(ctx))
.await
.unwrap_or_else(|err| panic!("failed to send ready message to handler: {err}"));
info!("{} is ready", ready.user.name);
}
@ -107,68 +126,17 @@ impl EventHandler for Handler {
.unwrap_or("no_guild_name".to_string())
);
let settings = {
let data_read = ctx.data.read().await;
let tx = self
.tx
.lock()
.expect("couldn't get lock for Handler messenger")
.clone();
data_read
.get::<Settings>()
.expect("settings should exist")
.clone()
};
let Some(Channel::Guild(channel)) = channel_id.to_channel_cached(&ctx.cache) else {
error!("Failed to get cached channel from member!");
return;
};
let Some(user) = settings.channels.get(channel.name()).and_then(|c| c.users.get(&member.user.name)) else {
info!("No sound associated for {} in channel {}", member.user.name, channel.name());
return;
};
let Some(manager) = songbird::get(&ctx).await else {
error!("Failed to get songbird manager from context");
return;
};
match manager.join(member.guild_id, channel_id).await {
(handler_lock, Ok(())) => {
let mut handler = handler_lock.lock().await;
let source = match user.ty {
SoundType::Youtube => match songbird::ytdl(&user.sound).await {
Ok(source) => source,
Err(err) => {
error!("Error starting youtube source: {err:?}");
return;
}
},
SoundType::File => {
match songbird::ffmpeg(format!("sounds/{}", &user.sound)).await {
Ok(source) => source,
Err(err) => {
error!("Error starting file source: {err:?}");
return;
}
}
}
};
let track_handler = handler.enqueue_source(source);
if let Err(err) = track_handler.add_event(
songbird::Event::Track(songbird::TrackEvent::End),
TrackEventHandler {
ctx,
guild_id: member.guild_id,
},
) {
error!("Failed to add event handler to track handle: {err:?}");
};
}
(_, Err(err)) => {
error!("Failed to join voice channel {}: {err:?}", channel.name());
}
if let Err(err) = tx
.send(HandlerMessage::PlaySound(ctx, member, channel_id))
.await
{
error!("Failed to send play sound message to handler: {err}");
}
}
}
@ -189,22 +157,22 @@ async fn main() {
info!("{settings:?}");
let songbird = songbird::Songbird::serenity();
let (tx, mut rx) = mpsc::channel(10);
let intents = GatewayIntents::GUILDS
| GatewayIntents::GUILD_MESSAGES
| GatewayIntents::MESSAGE_CONTENT
| GatewayIntents::GUILD_VOICE_STATES;
let mut client = Client::builder(&token, intents)
.event_handler(Handler)
.register_songbird()
.event_handler(Handler {
tx: std::sync::Mutex::new(tx.clone()),
})
.register_songbird_with(songbird.clone())
.await
.expect("Error creating client");
{
let mut data = client.data.write().await;
data.insert::<Settings>(Arc::new(settings));
}
info!("Starting bot with token '{token}'");
tokio::spawn(async move {
if let Err(err) = client.start().await {
@ -212,6 +180,102 @@ async fn main() {
}
});
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
match msg {
HandlerMessage::Ready(ctx) => {
info!("Got Ready message");
let songbird = songbird::get(&ctx).await.expect("no songbird instance");
for guild_id in settings.guilds.keys() {
let handler_lock = songbird.get_or_insert(GuildId(*guild_id));
let mut handler = handler_lock.lock().await;
handler.add_global_event(
songbird::Event::Track(songbird::TrackEvent::End),
TrackEventHandler {
tx: tx.clone(),
guild_id: GuildId(*guild_id),
},
);
}
}
HandlerMessage::TrackEnded(guild_id) => {
info!("Got TrackEnded message");
if let Some(manager) = songbird.get(guild_id) {
let mut handler = manager.lock().await;
let queue = handler.queue();
if queue.is_empty() {
info!("Track Queue is empty, leaving voice channel");
if let Err(err) = handler.leave().await {
error!("Failed to leave channel: {err:?}");
}
}
}
}
HandlerMessage::PlaySound(ctx, member, channel_id) => {
info!("Got PlaySound message");
let Some(Channel::Guild(channel)) = channel_id.to_channel_cached(&ctx.cache) else {
error!("Failed to get cached channel from member!");
continue;
};
let Some(user) = settings.guilds.get(channel.guild_id.as_u64())
.and_then(|guild| guild.channels.get(channel.name()))
.and_then(|c| c.users.get(&member.user.name))
else {
info!("No sound associated for {} in channel {}", member.user.name, channel.name());
continue;
};
let source = match user.ty {
SoundType::Youtube => match songbird::ytdl(&user.sound).await {
Ok(source) => source,
Err(err) => {
error!(
"Error starting youtube source from {}: {err:?}",
user.sound
);
continue;
}
},
SoundType::File => {
match songbird::ffmpeg(format!("sounds/{}", &user.sound)).await {
Ok(source) => source,
Err(err) => {
error!(
"Error starting file source from {}: {err:?}",
user.sound
);
continue;
}
}
}
};
match songbird.join(member.guild_id, channel_id).await {
(handler_lock, Ok(())) => {
let mut handler = handler_lock.lock().await;
let _track_handler = handler.enqueue_source(source);
// TODO: set volume
}
(_, Err(err)) => {
error!("Failed to join voice channel {}: {err:?}", channel.name());
}
}
}
}
}
});
let _ = tokio::signal::ctrl_c().await;
info!("Received Ctrl-C, shuttdown down.");
}