merged with master

This commit is contained in:
badaix 2016-11-17 15:47:23 +01:00
commit 8bd5361081
64 changed files with 1481 additions and 275 deletions

View file

@ -105,22 +105,27 @@ The goal is to build the following chain:
audio player software -> snapfifo -> snapserver -> network -> snapclient -> alsa
This [guide](doc/player_setup.md) shows how to configure different players/audio sources to redirect their audio signal into the Snapserver's fifo:
* [MPD](doc/player_setup.md#mpd-setup)
* [Mopidy](doc/player_setup.md#mopidy-setup)
* [MPlayer](doc/player_setup.md#mplayer-setup)
* [Alsa](doc/player_setup.md#alsa-setup)
* [PulseAudio](doc/player_setup.md#pulseaudio-setup)
* [MPD](doc/player_setup.md#mpd)
* [Mopidy](doc/player_setup.md#mopidy)
* [FFmpeg](doc/player_setup.md#ffmpeg)
* [MPlayer](doc/player_setup.md#mplayer)
* [Alsa](doc/player_setup.md#alsa)
* [PulseAudio](doc/player_setup.md#pulseaudio)
* [AirPlay](doc/player_setup.md#airplay)
* [Spotify](doc/player_setup.md#spotify)
* [Process](doc/player_setup.md#process)
Roadmap
-------
Unordered list of features that should make it into the v1.0
- [X] **Remote control** JSON-RPC API to change client latency, volume, zone, ...
- [X] **Android client** JSON-RPC client and Snapclient
- [X] **Zones** Support multiple streams
- [X] **Streams** Support multiple streams
- [X] **Debian packages** prebuild deb packages
- [X] **Endian** independent code
- [X] **OpenWrt** port Snapclient to OpenWrt
- [X] **Hi-Res audio** support (like 192kHz 24bit)
- [ ] **Groups** support multiple Groups of clients ("Zones")
- [ ] **JSON-RPC** Possibility to add, remove, rename streams
- [ ] **Protocol specification** Snapcast binary streaming protocol, JSON-RPC protocol
- [ ] **Ports** Snapclient for Windows, Mac OS X, ...

View file

@ -8,8 +8,8 @@ android {
applicationId "de.badaix.snapcast"
minSdkVersion 16
targetSdkVersion 23
versionCode 900
versionName "0.9.0"
versionCode 1000
versionName "0.10.0"
multiDexEnabled true
}
buildTypes {

View file

@ -1,6 +1,10 @@
VERSION = 0.9.0
VERSION = 0.10.0
TARGET = snapclient
SHELL = /bin/bash
ifeq ($(TARGET), FREEBSD)
SHELL = /usr/local/bin/bash
else
SHELL = /bin/bash
endif
ifdef DESTDIR
# dh_auto_install (Debian) sets this variable
@ -69,7 +73,7 @@ clean:
.PHONY: dpkg
#sudo apt-get install build-essential debhelper dh-make dh-systemd quilt fakeroot lintian
dpkg:
dpkg-buildpackage -rfakeroot -b
dpkg-buildpackage -rfakeroot -b -uc -us
dh_clean
ifdef DESTDIR

View file

@ -22,7 +22,6 @@
#include <assert.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <iostream>
#include "common/snapException.h"
#include "common/log.h"

View file

@ -42,7 +42,6 @@ ClientConnection::~ClientConnection()
void ClientConnection::socketRead(void* _to, size_t _bytes)
{
// std::unique_lock<std::mutex> mlock(mutex_);
size_t toRead = _bytes;
size_t len = 0;
do
@ -123,6 +122,7 @@ bool ClientConnection::send(const msg::BaseMessage* message) const
{
// std::unique_lock<std::mutex> mlock(mutex_);
//logD << "send: " << message->type << ", size: " << message->getSize() << "\n";
std::lock_guard<std::mutex> socketLock(socketMutex_);
if (!connected())
return false;
//logD << "send: " << message->type << ", size: " << message->getSize() << "\n";
@ -145,13 +145,10 @@ shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(const msg::Base
// logO << "Req: " << message->id << "\n";
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId_));
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests_.insert(pendingRequest);
}
std::unique_lock<std::mutex> lck(requestMutex_);
std::unique_lock<std::mutex> lock(pendingRequestsMutex_);
pendingRequests_.insert(pendingRequest);
send(message);
if (pendingRequest->cv.wait_for(lck, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
if (pendingRequest->cv.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{
response = pendingRequest->response;
sumTimeout_ = chronos::msec(0);
@ -164,10 +161,7 @@ shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(const msg::Base
if (sumTimeout_ > chronos::sec(10))
throw SnapException("sum timeout exceeded 10s");
}
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests_.erase(pendingRequest);
}
pendingRequests_.erase(pendingRequest);
return response;
}
@ -182,12 +176,15 @@ void ClientConnection::getNextMessage()
// logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
// {
// std::lock_guard<std::mutex> socketLock(socketMutex_);
socketRead(&buffer[0], baseMessage.size);
// }
tv t;
baseMessage.received = t;
{
std::unique_lock<std::mutex> mlock(mutex_);
std::unique_lock<std::mutex> lock(pendingRequestsMutex_);
// logD << "got lock - getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
{
for (auto req: pendingRequests_)
@ -198,7 +195,7 @@ void ClientConnection::getNextMessage()
req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size);
std::unique_lock<std::mutex> lck(requestMutex_);
lock.unlock();
req->cv.notify_one();
return;
}

View file

@ -108,12 +108,12 @@ protected:
void getNextMessage();
asio::io_service io_service_;
mutable std::mutex socketMutex_;
std::shared_ptr<tcp::socket> socket_;
std::atomic<bool> active_;
std::atomic<bool> connected_;
MessageReceiver* messageReceiver_;
mutable std::mutex mutex_;
mutable std::mutex requestMutex_;
mutable std::mutex pendingRequestsMutex_;
std::set<std::shared_ptr<PendingRequest>> pendingRequests_;
uint16_t reqId_;
std::string host_;

View file

@ -92,7 +92,7 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
logO << "Codec: " << headerChunk_->codec << "\n";
decoder_.reset(nullptr);
stream_.reset(nullptr);
stream_ = nullptr;
player_.reset(nullptr);
if (headerChunk_->codec == "pcm")
@ -109,15 +109,15 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
sampleFormat_ = decoder_->setHeader(headerChunk_.get());
logState << "sampleformat: " << sampleFormat_.rate << ":" << sampleFormat_.bits << ":" << sampleFormat_.channels << "\n";
stream_.reset(new Stream(sampleFormat_));
stream_ = make_shared<Stream>(sampleFormat_);
stream_->setBufferLen(serverSettings_->getBufferMs() - latency_);
#ifdef HAS_ALSA
player_.reset(new AlsaPlayer(pcmDevice_, stream_.get()));
player_.reset(new AlsaPlayer(pcmDevice_, stream_));
#elif HAS_OPENSL
player_.reset(new OpenslPlayer(pcmDevice_, stream_.get()));
player_.reset(new OpenslPlayer(pcmDevice_, stream_));
#elif HAS_COREAUDIO
player_.reset(new CoreAudioPlayer(pcmDevice_, stream_.get()));
player_.reset(new CoreAudioPlayer(pcmDevice_, stream_));
#else
throw SnapException("No audio player support");
#endif
@ -184,7 +184,7 @@ void Controller::worker()
if (reply)
{
TimeProvider::getInstance().setDiff(reply->latency, reply->received - reply->sent);
usleep(100);
chronos::usleep(100);
}
}
logO << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f << "\n";
@ -193,7 +193,7 @@ void Controller::worker()
{
for (size_t n=0; n<10 && active_; ++n)
{
usleep(100*1000);
chronos::sleep(100);
if (asyncException_)
throw AsyncSnapException(exception_);
}
@ -211,7 +211,7 @@ void Controller::worker()
stream_.reset();
decoder_.reset();
for (size_t n=0; (n<10) && active_; ++n)
usleep(100*1000);
chronos::sleep(100);
}
}
logD << "Thread stopped\n";

View file

@ -66,7 +66,7 @@ private:
PcmDevice pcmDevice_;
int latency_;
std::unique_ptr<ClientConnection> clientConnection_;
std::unique_ptr<Stream> stream_;
std::shared_ptr<Stream> stream_;
std::unique_ptr<Decoder> decoder_;
std::unique_ptr<Player> player_;
std::shared_ptr<msg::ServerSettings> serverSettings_;

View file

@ -1,3 +1,18 @@
snapclient (0.10.0) unstable; urgency=low
* Features
-Added support "process" streams:
Snapserver starts a process and reads PCM data from stdout
-Specialized versions for Spotify "spotify" and AirPlay "airplay"
* Bugfixes
-Fixed crash during server shutdown
-Fixed Makefile for FreeBSD
-Fixed building of dpk (unsigned .changes file)
* General
-Speed up client and server shutdown
-- Johannes Pohl <johannes.pohl@badaix.de> Wed, 16 Nov 2016 00:13:37 +0200
snapclient (0.9.0) unstable; urgency=low
* Features

View file

@ -7,7 +7,7 @@
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/snapclient</string>
<string>-d</string>
<!-- <string>-d</string> -->
</array>
<key>RunAtLoad</key>
<true/>

View file

@ -19,7 +19,6 @@
#include <iostream>
#include <cstring>
#include <cmath>
#include <FLAC/stream_decoder.h>
#include "flacDecoder.h"
#include "common/snapException.h"
#include "common/endian.h"
@ -43,7 +42,7 @@ static FLAC__StreamDecoder *decoder = NULL;
FlacDecoder::FlacDecoder() : Decoder()
FlacDecoder::FlacDecoder() : Decoder(), lastError_(nullptr)
{
flacChunk = new msg::PcmChunk();
}
@ -69,7 +68,19 @@ bool FlacDecoder::decode(msg::PcmChunk* chunk)
pcmChunk->payload = (char*)realloc(pcmChunk->payload, 0);
pcmChunk->payloadSize = 0;
while (flacChunk->payloadSize > 0)
FLAC__stream_decoder_process_single(decoder);
{
if (!FLAC__stream_decoder_process_single(decoder))
{
return false;
}
if (lastError_)
{
logE << "FLAC decode error: " << FLAC__StreamDecoderErrorStatusString[*lastError_] << "\n";
lastError_= nullptr;
return false;
}
}
if ((cacheInfo_.cachedBlocks_ > 0) && (cacheInfo_.sampleRate_ != 0))
{
@ -116,10 +127,14 @@ FLAC__StreamDecoderReadStatus read_callback(const FLAC__StreamDecoder *decoder,
}
else if (flacChunk != NULL)
{
// cerr << "read_callback: " << *bytes << ", avail: " << flacChunk->payloadSize << "\n";
static_cast<FlacDecoder*>(client_data)->cacheInfo_.isCachedChunk_ = false;
if (*bytes > flacChunk->payloadSize)
*bytes = flacChunk->payloadSize;
// if (*bytes == 0)
// return FLAC__STREAM_DECODER_READ_STATUS_END_OF_STREAM;
memcpy(buffer, flacChunk->payload, *bytes);
memmove(flacChunk->payload, flacChunk->payload + *bytes, flacChunk->payloadSize - *bytes);
flacChunk->payloadSize = flacChunk->payloadSize - *bytes;
@ -196,6 +211,17 @@ void error_callback(const FLAC__StreamDecoder *decoder, FLAC__StreamDecoderError
{
(void)decoder, (void)client_data;
logS(kLogErr) << "Got error callback: " << FLAC__StreamDecoderErrorStatusString[status] << "\n";
static_cast<FlacDecoder*>(client_data)->lastError_ = std::unique_ptr<FLAC__StreamDecoderErrorStatus>(new FLAC__StreamDecoderErrorStatus(status));
/// TODO, see issue #120:
// Thu Nov 10 07:26:44 2016 daemon.warn dnsmasq-dhcp[1194]: no address range available for DHCP request via wlan0
// Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
// Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
//
// and:
// Oct 27 17:37:38 kitchen snapclient[869]: Connected to 192.168.222.10
// Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_UNPARSEABLE_STREAM
// Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
}

View file

@ -21,6 +21,11 @@
#include "decoder.h"
#include <FLAC/stream_decoder.h>
#include <atomic>
#include <memory>
struct CacheInfo
{
@ -50,6 +55,7 @@ public:
virtual SampleFormat setHeader(msg::CodecHeader* chunk);
CacheInfo cacheInfo_;
std::unique_ptr<FLAC__StreamDecoderErrorStatus> lastError_;
};

View file

@ -26,7 +26,7 @@
using namespace std;
AlsaPlayer::AlsaPlayer(const PcmDevice& pcmDevice, Stream* stream) :
AlsaPlayer::AlsaPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) :
Player(pcmDevice, stream), handle_(NULL), buff_(NULL)
{
}
@ -198,7 +198,7 @@ void AlsaPlayer::worker()
{
snd_pcm_sframes_t pcm;
snd_pcm_sframes_t framesDelay;
long lastChunkTick = 0;
long lastChunkTick = chronos::getTickCount();
while (active_)
{
@ -211,7 +211,7 @@ void AlsaPlayer::worker()
catch (const std::exception& e)
{
logE << "Exception in initAlsa: " << e.what() << endl;
usleep(100*1000);
chronos::sleep(100);
}
}

View file

@ -30,7 +30,7 @@
class AlsaPlayer : public Player
{
public:
AlsaPlayer(const PcmDevice& pcmDevice, Stream* stream);
AlsaPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
virtual ~AlsaPlayer();
/// Set audio volume in range [0..1]

View file

@ -16,7 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <unistd.h>
#include "coreAudioPlayer.h"
#define NUM_BUFFERS 2
@ -32,7 +31,7 @@ void callback(void *custom_data, AudioQueueRef queue, AudioQueueBufferRef buffer
}
CoreAudioPlayer::CoreAudioPlayer(const PcmDevice& pcmDevice, Stream* stream) :
CoreAudioPlayer::CoreAudioPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) :
Player(pcmDevice, stream),
ms_(100),
pubStream_(stream)

View file

@ -37,7 +37,7 @@
class CoreAudioPlayer : public Player
{
public:
CoreAudioPlayer(const PcmDevice& pcmDevice, Stream* stream);
CoreAudioPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
virtual ~CoreAudioPlayer();
void playerCallback(AudioQueueRef queue, AudioQueueBufferRef bufferRef);
@ -49,7 +49,7 @@ protected:
size_t ms_;
size_t frames_;
size_t buff_size_;
Stream* pubStream_;
std::shared_ptr<Stream> pubStream_;
};

View file

@ -16,7 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <unistd.h>
#include <assert.h>
#include <iostream>
@ -44,7 +43,7 @@ static void bqPlayerCallback(SLAndroidSimpleBufferQueueItf bq, void *context)
OpenslPlayer::OpenslPlayer(const PcmDevice& pcmDevice, Stream* stream) :
OpenslPlayer::OpenslPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) :
Player(pcmDevice, stream),
engineObject(NULL),
engineEngine(NULL),
@ -122,7 +121,7 @@ void OpenslPlayer::playerCallback(SLAndroidSimpleBufferQueueItf bq)
{
SLresult result = (*bq)->Enqueue(bq, buffer[curBuffer], buff_size);
if (result == SL_RESULT_BUFFER_INSUFFICIENT)
usleep(1000);
chronos::sleep(1);
else
break;
}

View file

@ -35,7 +35,7 @@ typedef int (*AndroidAudioCallback)(short *buffer, int num_samples);
class OpenslPlayer : public Player
{
public:
OpenslPlayer(const PcmDevice& pcmDevice, Stream* stream);
OpenslPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
virtual ~OpenslPlayer();
virtual void start();
@ -69,7 +69,7 @@ protected:
size_t ms_;
size_t frames_;
size_t buff_size;
Stream* pubStream_;
std::shared_ptr<Stream> pubStream_;
};

View file

@ -26,7 +26,7 @@
using namespace std;
Player::Player(const PcmDevice& pcmDevice, Stream* stream) :
Player::Player(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) :
active_(false),
stream_(stream),
pcmDevice_(pcmDevice),

View file

@ -36,7 +36,7 @@
class Player
{
public:
Player(const PcmDevice& pcmDevice, Stream* stream);
Player(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
virtual ~Player();
/// Set audio volume in range [0..1]
@ -59,7 +59,7 @@ protected:
void adjustVolume(char* buffer, size_t frames);
std::atomic<bool> active_;
Stream* stream_;
std::shared_ptr<Stream> stream_;
std::thread playerThread_;
PcmDevice pcmDevice_;
double volume_;

View file

@ -192,7 +192,7 @@ int main (int argc, char **argv)
{
logS(kLogErr) << "Exception: " << e.what() << std::endl;
}
usleep(500*1000);
chronos::sleep(500);
}
#endif
}
@ -203,7 +203,7 @@ int main (int argc, char **argv)
logO << "Latency: " << latency << "\n";
controller->start(pcmDevice, host, port, latency);
while(!g_terminated)
usleep(100*1000);
chronos::sleep(100);
controller->stop();
}
}

View file

@ -76,20 +76,13 @@ void Stream::addChunk(msg::PcmChunk* chunk)
while (chunks_.size() * chunk->duration<cs::msec>().count() > 10000)
chunks_.pop();
chunks_.push(shared_ptr<msg::PcmChunk>(chunk));
std::unique_lock<std::mutex> lck(cvMutex_);
cv_.notify_one();
// logD << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n";
}
bool Stream::waitForChunk(size_t ms) const
{
if (!chunks_.empty())
return true;
std::unique_lock<std::mutex> lck(cvMutex_);
cv_.wait_for(lck, std::chrono::milliseconds(ms));
return !chunks_.empty();
return chunks_.wait_for(std::chrono::milliseconds(ms));
}

View file

@ -89,9 +89,6 @@ private:
unsigned long playedFrames_;
long correctAfterXFrames_;
chronos::msec bufferMs_;
mutable std::condition_variable cv_;
mutable std::mutex cvMutex_;
};

View file

@ -19,7 +19,8 @@
#ifndef QUEUE_H
#define QUEUE_H
#include <queue>
#include <deque>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
@ -33,11 +34,11 @@ public:
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
// std::lock_guard<std::mutex> lock(mutex_);
auto val = queue_.front();
queue_.pop();
queue_.pop_front();
return val;
}
@ -50,15 +51,37 @@ public:
return queue_.front();
}
void abort_wait()
{
{
std::lock_guard<std::mutex> mlock(mutex_);
abort_ = true;
}
cond_.notify_one();
}
bool wait_for(std::chrono::milliseconds timeout) const
{
std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false;
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false;
return !queue_.empty() && !abort_;
}
bool try_pop(T& item, std::chrono::microseconds timeout)
{
std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false;
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false;
if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); }))
if (queue_.empty() || abort_)
return false;
item = std::move(queue_.front());
queue_.pop();
queue_.pop_front();
return true;
}
@ -72,32 +95,51 @@ public:
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
queue_.pop_front();
}
void push_front(const T& item)
{
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_front(item);
}
cond_.notify_one();
}
void push_front(T&& item)
{
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_front(std::move(item));
}
cond_.notify_one();
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_back(item);
}
cond_.notify_one();
}
void push(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(std::move(item));
mlock.unlock();
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_back(std::move(item));
}
cond_.notify_one();
}
size_t size() const
{
std::unique_lock<std::mutex> mlock(mutex_);
std::lock_guard<std::mutex> mlock(mutex_);
return queue_.size();
}
@ -111,9 +153,10 @@ public:
Queue& operator=(const Queue&) = delete; // disable assignment
private:
std::queue<T> queue_;
std::deque<T> queue_;
mutable std::atomic<bool> abort_;
mutable std::mutex mutex_;
std::condition_variable cond_;
mutable std::condition_variable cond_;
};

View file

@ -20,6 +20,7 @@
#define TIME_DEFS_H
#include <chrono>
#include <thread>
#include <sys/time.h>
#ifdef MACOS
#include <mach/clock.h>
@ -78,6 +79,27 @@ namespace chronos
{
return std::chrono::duration_cast<ToDuration>(d).count();
}
/// some sleep functions. Just for convenience.
template< class Rep, class Period >
inline void sleep(const std::chrono::duration<Rep, Period>& sleep_duration)
{
std::this_thread::sleep_for(sleep_duration);
}
inline void sleep(const int32_t& milliseconds)
{
if (milliseconds < 0)
return;
sleep(msec(milliseconds));
}
inline void usleep(const int32_t& microseconds)
{
if (microseconds < 0)
return;
sleep(usec(microseconds));
}
}

View file

@ -7,7 +7,7 @@ The goal is to build the following chain:
audio player software -> snapfifo -> snapserver -> network -> snapclient -> alsa
###MPD setup
###MPD
To connect [MPD](http://www.musicpd.org/) to the Snapserver, edit `/etc/mpd.conf`, so that mpd will feed the audio into the snapserver's named pipe
Disable alsa audio output by commenting out this section:
@ -38,19 +38,24 @@ To test your mpd installation, you can add a radio station by
$ sudo su
$ echo "http://1live.akacast.akamaistream.net/7/706/119434/v1/gnl.akacast.akamaistream.net/1live" > /var/lib/mpd/playlists/einslive.m3u
###Mopidy setup
###Mopidy
[Mopidy](https://www.mopidy.com/) can stream the audio output into the Snapserver's fifo with a `filesink` as audio output in `mopidy.conf`:
[audio]
#output = autoaudiosink
output = audioresample ! audioconvert ! audio/x-raw,rate=48000,channels=2,format=S16LE ! wavenc ! filesink location=/tmp/snapfifo
###MPlayer setup
###FFmpeg
Pipe FFmpeg's audio output to the snapfifo:
ffmpeg -y -i http://wms-15.streamsrus.com:11630 -f u16le -acodec pcm_s16le -ac 2 -ar 48000 /tmp/snapfifo
###MPlayer
Use `-novideo` and `-ao` to pipe MPlayer's audio output to the snapfifo:
mplayer http://wms-15.streamsrus.com:11630 -novideo -channels 2 -srate 48000 -af format=s16le -ao pcm:file=/tmp/snapfifo
###Alsa setup
###Alsa
If the player cannot be configured to route the audio stream into the snapfifo, Alsa or PulseAudio can be redirected, resulting in this chain:
audio player software -> Alsa -> Alsa file plugin -> snapfifo -> snapserver -> network -> snapclient -> Alsa
@ -80,7 +85,7 @@ pcm.writeFile {
}
```
###PulseAudio setup
###PulseAudio
Redirect the PulseAudio stream into the snapfifo:
audio player software -> PulseAudio -> PulsaAudio pipe sink -> snapfifo -> snapserver -> network -> snapclient -> Alsa
@ -93,3 +98,23 @@ Load the module `pipe-sink` like this:
pacmd update-sink-proplist Snapcast device.description=Snapcast
It might me neccessary to set the pulse audio latency environment variable to 60 msec: `PULSE_LATENCY_MSEC=60`
###AirPlay
Snapserver supports [shairport-sync](https://github.com/mikebrady/shairport-sync) with `stdout` backend.
1. Build shairport-sync with `stdout` backend: `./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata`
2. Copy the `shairport-sync` binary somewhere to your `PATH`, e.g. `/usr/local/bin/`
3. Configure snapserver with `-s "airplay:///shairport-sync?name=Airplay[&devicename=Snapcast][&port=5000]"`
###Spotify
Snapserver supports [librespot](https://github.com/badaix/librespot) with `stdout` backend.
1. Build `librespot` with `stdout` backend: `cargo build --features stdout-backend`
2. Copy the `librespot` binary somewhere to your `PATH`, e.g. `/usr/local/bin/`
3. Configure snapserver with `-s "spotify:///librespot?name=Spotify&username=<my username>&password=<my password>[&devicename=Snapcast][&bitrate=320]"`
###Process
Snapserver can start any process and read PCM data from stdout:
Configure snapserver with `-s "process:///path/to/process?name=Process[&params=<--my list --of params>][&logStderr=false]"`

View file

@ -1,10 +1,9 @@
*TODO:
-Server ping client?
-Server: OnResync while terminating?!?
-LastSeen: relative time [s] or [ms]?
-Android crash: Empty latency => app restart => empty client list
-Segfault when ^c on OpenWrt with client connected
-Android clean data structures after changing the Server
*JSON RPC:
curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0", "method": "Application.SetVolume", "params": {"volume":100}, "id": 1}' http://i3c.pla.lcl:8080/jsonrpc

View file

@ -9,7 +9,7 @@ include $(TOPDIR)/rules.mk
include $(INCLUDE_DIR)/target.mk
PKG_NAME := snapcast
PKG_VERSION := 0.9.0
PKG_VERSION := 0.10.0
PKG_RELEASE := $(PKG_SOURCE_VERSION)
PKG_USE_MIPS16 := 0

View file

@ -1,6 +1,10 @@
VERSION = 0.9.0
VERSION = 0.10.0
TARGET = snapserver
SHELL = /bin/bash
ifeq ($(TARGET), FREEBSD)
SHELL = /usr/local/bin/bash
else
SHELL = /bin/bash
endif
ifdef DESTDIR
# dh_auto_install (Debian) sets this variable
@ -11,7 +15,8 @@ endif
CXXFLAGS += $(ADD_CFLAGS) -std=c++0x -Wall -Wno-unused-function -O3 -DASIO_STANDALONE -DVERSION=\"$(VERSION)\" -I. -I.. -I../externals/asio/asio/include -I../externals/popl/include
LDFLAGS = -lvorbis -lvorbisenc -logg -lFLAC
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o streamreader/streamUri.o streamreader/streamManager.o streamreader/pcmStream.o streamreader/pipeStream.o streamreader/fileStream.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o ../common/log.o ../common/sampleFormat.o ../message/pcmChunk.o
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o streamreader/streamUri.o streamreader/streamManager.o streamreader/pcmStream.o streamreader/pipeStream.o streamreader/fileStream.o streamreader/processStream.o streamreader/airplayStream.o streamreader/spotifyStream.o streamreader/watchdog.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o ../common/log.o ../common/sampleFormat.o ../message/pcmChunk.o
ifeq ($(ENDIAN), BIG)
CXXFLAGS += -DIS_BIG_ENDIAN
@ -75,7 +80,7 @@ clean:
.PHONY: dpkg
#sudo apt-get install build-essential debhelper dh-make dh-systemd quilt fakeroot lintian
dpkg:
dpkg-buildpackage -rfakeroot -b
dpkg-buildpackage -rfakeroot -b -uc -us
dh_clean
ifdef DESTDIR

View file

@ -41,7 +41,10 @@ ControlSession::~ControlSession()
void ControlSession::start()
{
active_ = true;
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
active_ = true;
}
readerThread_ = new thread(&ControlSession::reader, this);
writerThread_ = new thread(&ControlSession::writer, this);
}
@ -49,13 +52,20 @@ void ControlSession::start()
void ControlSession::stop()
{
std::unique_lock<std::mutex> mlock(mutex_);
active_ = false;
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
if (!active_)
return;
active_ = false;
}
try
{
std::error_code ec;
if (socket_)
{
std::lock_guard<std::mutex> socketLock(socketMutex_);
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec.message() << "\n";
socket_->close(ec);
@ -70,6 +80,7 @@ void ControlSession::stop()
if (writerThread_)
{
logD << "joining writerThread\n";
messages_.abort_wait();
writerThread_->join();
delete writerThread_;
}
@ -94,9 +105,12 @@ void ControlSession::sendAsync(const std::string& message)
bool ControlSession::send(const std::string& message) const
{
// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_);
if (!socket_ || !active_)
return false;
std::lock_guard<std::mutex> socketLock(socketMutex_);
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
if (!socket_ || !active_)
return false;
}
asio::streambuf streambuf;
std::ostream request_stream(&streambuf);
request_stream << message << "\r\n";
@ -108,7 +122,6 @@ bool ControlSession::send(const std::string& message) const
void ControlSession::reader()
{
active_ = true;
try
{
std::stringstream message;

View file

@ -76,7 +76,8 @@ protected:
void writer();
std::atomic<bool> active_;
mutable std::mutex mutex_;
mutable std::mutex activeMutex_;
mutable std::mutex socketMutex_;
std::thread* readerThread_;
std::thread* writerThread_;
std::shared_ptr<tcp::socket> socket_;

View file

@ -1,3 +1,18 @@
snapserver (0.10.0) unstable; urgency=low
* Features
-Added support "process" streams:
Snapserver starts a process and reads PCM data from stdout
-Specialized versions for Spotify "spotify" and AirPlay "airplay"
* Bugfixes
-Fixed crash during server shutdown
-Fixed Makefile for FreeBSD
-Fixed building of dpk (unsigned .changes file)
* General
-Speed up client and server shutdown
-- Johannes Pohl <johannes.pohl@badaix.de> Wed, 16 Nov 2016 00:13:37 +0200
snapserver (0.9.0) unstable; urgency=low
* Features

View file

@ -7,7 +7,7 @@
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/snapserver</string>
<string>-d</string>
<!-- <string>-d</string> -->
</array>
<key>RunAtLoad</key>
<true/>

View file

@ -171,7 +171,7 @@ int main(int argc, char* argv[])
std::thread t(func, &io_service);
while (!g_terminated)
usleep(100*1000);
chronos::sleep(100);
io_service.stop();
t.join();

View file

@ -54,13 +54,13 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk*
bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get());
std::shared_ptr<const msg::BaseMessage> shared_message(chunk);
std::lock_guard<std::mutex> mlock(sessionsMutex_);
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto s : sessions_)
{
if (!s->pcmStream() && isDefaultStream)//->getName() == "default")
s->add(shared_message);
s->sendAsync(shared_message);
else if (s->pcmStream().get() == pcmStream)
s->add(shared_message);
s->sendAsync(shared_message);
}
}
@ -73,17 +73,8 @@ void StreamServer::onResync(const PcmStream* pcmStream, double ms)
void StreamServer::onDisconnect(StreamSession* streamSession)
{
std::lock_guard<std::mutex> mlock(sessionsMutex_);
std::shared_ptr<StreamSession> session = nullptr;
for (auto s: sessions_)
{
if (s.get() == streamSession)
{
session = s;
break;
}
}
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
session_ptr session = getStreamSession(streamSession);
if (session == nullptr)
return;
@ -192,10 +183,10 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
clientInfo->config.streamId = streamId;
response = clientInfo->config.streamId;
StreamSession* session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL)
session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != nullptr)
{
session->add(stream->getHeader());
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
}
}
@ -218,8 +209,8 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
serverSettings.setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency);
StreamSession* session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL)
session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != nullptr)
session->send(&serverSettings);
Config::instance().save();
@ -244,15 +235,15 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{
logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
// logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::kTime)
{
msg::Time timeMsg;
timeMsg.deserialize(baseMessage, buffer);
timeMsg.refersTo = timeMsg.id;
timeMsg.latency = timeMsg.received - timeMsg.sent;
msg::Time* timeMsg = new msg::Time();
timeMsg->deserialize(baseMessage, buffer);
timeMsg->refersTo = timeMsg->id;
timeMsg->latency = timeMsg->received - timeMsg->sent;
// logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
connection->send(&timeMsg);
connection->sendAsync(timeMsg, true);
// refresh connection state
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
@ -281,13 +272,13 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
else
{
logD << "request kServerSettings\n";
msg::ServerSettings serverSettings;
serverSettings.setVolume(clientInfo->config.volume.percent);
serverSettings.setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency);
serverSettings.setBufferMs(settings_.bufferMs);
serverSettings.refersTo = helloMsg.id;
connection->send(&serverSettings);
msg::ServerSettings* serverSettings = new msg::ServerSettings();
serverSettings->setVolume(clientInfo->config.volume.percent);
serverSettings->setMuted(clientInfo->config.volume.muted);
serverSettings->setLatency(clientInfo->config.latency);
serverSettings->setBufferMs(settings_.bufferMs);
serverSettings->refersTo = helloMsg.id;
connection->sendAsync(serverSettings);
}
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
@ -303,16 +294,16 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
// Assign and update stream
PcmStreamPtr stream = streamManager_->getStream(client->config.streamId);
if (stream == nullptr)
if (!stream)
{
stream = streamManager_->getDefaultStream();
client->config.streamId = stream->getUri().id();
client->config.streamId = stream->getId();
}
Config::instance().save();
connection->setPcmStream(stream);
auto headerChunk = stream->getHeader();
connection->send(headerChunk.get());
connection->sendAsync(headerChunk);
json notification = JsonNotification::getJson("Client.OnConnect", client->toJson());
// logO << notification.dump(4) << "\n";
@ -321,16 +312,29 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
}
StreamSession* StreamServer::getStreamSession(const std::string& mac)
session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const
{
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)
{
if (session.get() == streamSession)
return session;
}
return nullptr;
}
session_ptr StreamServer::getStreamSession(const std::string& mac) const
{
// logO << "getStreamSession: " << mac << "\n";
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)
{
// logO << "getStreamSession, checking: " << session->macAddress << "\n";
if (session->macAddress == mac)
return session.get();
return session;
}
return NULL;
return nullptr;
}
@ -350,19 +354,17 @@ void StreamServer::handleAccept(socket_ptr socket)
setsockopt(socket->native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
/// experimental: turn on tcp::no_delay
// asio::ip::tcp::no_delay option;
// socket->get_option(option);
// logE << "no_delay: " << option.value() << "\n";
socket->set_option(tcp::no_delay(true));
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<StreamSession> session = make_shared<StreamSession>(this, socket);
{
std::lock_guard<std::mutex> mlock(sessionsMutex_);
session->setBufferMs(settings_.bufferMs);
session->start();
sessions_.insert(session);
}
session->setBufferMs(settings_.bufferMs);
session->start();
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
sessions_.insert(session);
startAccept();
}
@ -376,11 +378,10 @@ void StreamServer::start()
streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs));
// throw SnapException("xxx");
//TODO: check uniqueness of the stream
for (const auto& streamUri: settings_.pcmStreams)
{
PcmStream* stream = streamManager_->addStream(streamUri);
if (stream != NULL)
PcmStreamPtr stream = streamManager_->addStream(streamUri);
if (stream)
logO << "Stream: " << stream->getUri().toJson() << "\n";
}
streamManager_->start();
@ -399,11 +400,23 @@ void StreamServer::start()
void StreamServer::stop()
{
// std::lock_guard<std::mutex> mlock(sessionsMutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)
if (streamManager_)
{
if (session)
session->stop();
streamManager_->stop();
streamManager_ = nullptr;
}
{
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)
{
if (session)
{
session->stop();
session = nullptr;
}
}
sessions_.clear();
}
if (controlServer_)
@ -417,11 +430,5 @@ void StreamServer::stop()
acceptor_->cancel();
acceptor_ = nullptr;
}
if (streamManager_)
{
streamManager_->stop();
streamManager_ = nullptr;
}
}

View file

@ -39,7 +39,7 @@
using asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr;
typedef std::shared_ptr<StreamSession> session_ptr;
struct StreamServerSettings
{
@ -96,9 +96,10 @@ public:
private:
void startAccept();
void handleAccept(socket_ptr socket);
StreamSession* getStreamSession(const std::string& mac);
mutable std::mutex sessionsMutex_;
std::set<std::shared_ptr<StreamSession>> sessions_;
session_ptr getStreamSession(const std::string& mac) const;
session_ptr getStreamSession(StreamSession* session) const;
mutable std::recursive_mutex sessionsMutex_;
std::set<session_ptr> sessions_;
asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_;

View file

@ -28,7 +28,7 @@ using namespace std;
StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) :
active_(true), messageReceiver_(receiver), pcmStream_(nullptr)
active_(false), readerThread_(nullptr), writerThread_(nullptr), messageReceiver_(receiver), pcmStream_(nullptr)
{
socket_ = socket;
}
@ -54,45 +54,55 @@ const PcmStreamPtr StreamSession::pcmStream() const
void StreamSession::start()
{
setActive(true);
readerThread_ = new thread(&StreamSession::reader, this);
writerThread_ = new thread(&StreamSession::writer, this);
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
active_ = true;
}
readerThread_.reset(new thread(&StreamSession::reader, this));
writerThread_.reset(new thread(&StreamSession::writer, this));
}
void StreamSession::stop()
{
setActive(false);
std::unique_lock<std::mutex> mlock(mutex_);
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
if (!active_)
return;
active_ = false;
}
try
{
std::error_code ec;
if (socket_)
{
std::lock_guard<std::mutex> socketLock(socketMutex_);
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec.message() << "\n";
socket_->close(ec);
if (ec) logE << "Error in socket close: " << ec.message() << "\n";
}
if (readerThread_)
if (readerThread_ && readerThread_->joinable())
{
logD << "joining readerThread\n";
readerThread_->join();
delete readerThread_;
}
if (writerThread_)
if (writerThread_ && writerThread_->joinable())
{
logD << "joining writerThread\n";
messages_.abort_wait();
writerThread_->join();
delete writerThread_;
}
}
catch(...)
{
}
readerThread_ = NULL;
writerThread_ = NULL;
socket_ = NULL;
readerThread_ = nullptr;
writerThread_ = nullptr;
socket_ = nullptr;
logD << "StreamSession stopped\n";
}
@ -108,14 +118,26 @@ void StreamSession::socketRead(void* _to, size_t _bytes)
}
void StreamSession::add(const shared_ptr<const msg::BaseMessage>& message)
void StreamSession::sendAsync(const msg::BaseMessage* message, bool sendNow)
{
std::shared_ptr<const msg::BaseMessage> shared_message(message);
sendAsync(shared_message, sendNow);
}
void StreamSession::sendAsync(const shared_ptr<const msg::BaseMessage>& message, bool sendNow)
{
if (!message)
return;
while (messages_.size() > 100)// chunk->getDuration() > 10000)
//the writer will take care about old messages
while (messages_.size() > 2000)// chunk->getDuration() > 10000)
messages_.pop();
messages_.push(message);
if (sendNow)
messages_.push_front(message);
else
messages_.push(message);
}
@ -135,9 +157,12 @@ bool StreamSession::send(const msg::BaseMessage* message) const
{
//TODO on exception: set active = false
// logO << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_);
if (!socket_ || !active_)
return false;
std::lock_guard<std::mutex> socketLock(socketMutex_);
{
std::lock_guard<std::mutex> activeLock(activeMutex_);
if (!socket_ || !active_)
return false;
}
asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
@ -165,11 +190,14 @@ void StreamSession::getNextMessage()
// logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
// {
// std::lock_guard<std::mutex> socketLock(socketMutex_);
socketRead(&buffer[0], baseMessage.size);
// }
tv t;
baseMessage.received = t;
if (messageReceiver_ != NULL)
if (active_ && (messageReceiver_ != NULL))
messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]);
}
@ -187,7 +215,9 @@ void StreamSession::reader()
{
logS(kLogErr) << "Exception in StreamSession::reader(): " << e.what() << endl;
}
setActive(false);
if (active_ && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this);
}
@ -224,16 +254,9 @@ void StreamSession::writer()
{
logS(kLogErr) << "Exception in StreamSession::writer(): " << e.what() << endl;
}
setActive(false);
}
void StreamSession::setActive(bool active)
{
std::lock_guard<std::mutex> mlock(activeMutex_);
if (active_ && !active && (messageReceiver_ != NULL))
if (active_ && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this);
active_ = active;
}

View file

@ -22,11 +22,11 @@
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <memory>
#include <asio.hpp>
#include <condition_variable>
#include <set>
#include <mutex>
#include "message/message.h"
#include "common/queue.h"
#include "streamreader/streamManager.h"
@ -66,7 +66,8 @@ public:
bool send(const msg::BaseMessage* message) const;
/// Sends a message to the client (asynchronous)
void add(const std::shared_ptr<const msg::BaseMessage>& message);
void sendAsync(const std::shared_ptr<const msg::BaseMessage>& message, bool sendNow = false);
void sendAsync(const msg::BaseMessage* message, bool sendNow = false);
bool active() const;
@ -88,14 +89,13 @@ protected:
void getNextMessage();
void reader();
void writer();
void setActive(bool active);
mutable std::mutex activeMutex_;
std::atomic<bool> active_;
mutable std::mutex mutex_;
std::thread* readerThread_;
std::thread* writerThread_;
std::unique_ptr<std::thread> readerThread_;
std::unique_ptr<std::thread> writerThread_;
mutable std::mutex socketMutex_;
std::shared_ptr<tcp::socket> socket_;
MessageReceiver* messageReceiver_;
Queue<std::shared_ptr<const msg::BaseMessage>> messages_;

View file

@ -0,0 +1,87 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "airplayStream.h"
#include "common/snapException.h"
#include "common/utils.h"
#include "common/log.h"
using namespace std;
AirplayStream::AirplayStream(PcmListener* pcmListener, const StreamUri& uri) : ProcessStream(pcmListener, uri), port_(5000)
{
logStderr_ = true;
sampleFormat_ = SampleFormat("44100:16:2");
uri_.query["sampleformat"] = sampleFormat_.getFormat();
port_ = cpt::stoul(uri_.getQuery("port", "5000"));
string devicename = uri_.getQuery("devicename", "Snapcast");
params_wo_port_ = "--name=\"" + devicename + "\" --output=stdout";
params_ = params_wo_port_ + " --port=" + cpt::to_string(port_);
}
AirplayStream::~AirplayStream()
{
}
void AirplayStream::initExeAndPath(const std::string& filename)
{
path_ = "";
exe_ = findExe(filename);
if (!fileExists(exe_) || (exe_ == "/"))
{
exe_ = findExe("shairport-sync");
if (!fileExists(exe_))
throw SnapException("shairport-sync not found");
}
if (exe_.find("/") != string::npos)
{
path_ = exe_.substr(0, exe_.find_last_of("/") + 1);
exe_ = exe_.substr(exe_.find_last_of("/") + 1);
}
}
void AirplayStream::onStderrMsg(const char* buffer, size_t n)
{
string logmsg = trim_copy(string(buffer, n));
if (logmsg.empty())
return;
logO << "(" << getName() << ") " << logmsg << "\n";
if (logmsg.find("Is another Shairport Sync running on this device") != string::npos)
{
logE << "Seem there is another Shairport Sync runnig on port " << port_ << ", switching to port " << port_ + 1 << "\n";
++port_;
params_ = params_wo_port_ + " --port=" + cpt::to_string(port_);
}
else if (logmsg.find("Invalid audio output specified") != string::npos)
{
logE << "shairport sync compiled without stdout audio backend\n";
logE << "build with: \"./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata\"\n";
}
}

View file

@ -0,0 +1,47 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef AIRPLAY_STREAM_H
#define AIRPLAY_STREAM_H
#include "processStream.h"
/// Starts shairport-sync and reads PCM data from stdout
/**
* Starts librespot, reads PCM data from stdout, and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
* usage:
* snapserver -s "airplay:///shairport-sync?name=Airplay[&devicename=Snapcast][&port=5000]"
*/
class AirplayStream : public ProcessStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
AirplayStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~AirplayStream();
protected:
virtual void onStderrMsg(const char* buffer, size_t n);
virtual void initExeAndPath(const std::string& filename);
size_t port_;
std::string params_wo_port_;
};
#endif

View file

@ -19,7 +19,6 @@
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "fileStream.h"
#include "encoder/encoderFactory.h"
@ -85,6 +84,7 @@ void FileStream::worker()
ifs.read(chunk->payload + count, toRead - count);
encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount();
@ -92,7 +92,8 @@ void FileStream::worker()
if (nextTick >= currentTick)
{
// logO << "sleep: " << nextTick - currentTick << "\n";
usleep((nextTick - currentTick) * 1000);
if (!sleep(nextTick - currentTick))
break;
}
else
{
@ -105,7 +106,7 @@ void FileStream::worker()
}
catch(const std::exception& e)
{
logE << "Exception: " << e.what() << std::endl;
logE << "(FileStream) Exception: " << e.what() << std::endl;
}
}
}

View file

@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef FILE_READER_H
#define FILE_READER_H
#ifndef FILE_STREAM_H
#define FILE_STREAM_H
#include "pcmStream.h"
#include <fstream>
@ -37,7 +37,7 @@ public:
virtual ~FileStream();
protected:
void worker();
virtual void worker();
std::ifstream ifs;
};

View file

@ -19,7 +19,6 @@
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "encoder/encoderFactory.h"
#include "common/snapException.h"
@ -78,6 +77,12 @@ const std::string& PcmStream::getName() const
}
const std::string& PcmStream::getId() const
{
return getName();
}
const SampleFormat& PcmStream::getSampleFormat() const
{
return sampleFormat_;
@ -88,19 +93,29 @@ void PcmStream::start()
{
logD << "PcmStream start: " << sampleFormat_.getFormat() << "\n";
encoder_->init(this, sampleFormat_);
active_ = true;
readerThread_ = thread(&PcmStream::worker, this);
active_ = true;
thread_ = thread(&PcmStream::worker, this);
}
void PcmStream::stop()
{
if (active_)
{
active_ = false;
readerThread_.join();
}
if (!active_ && !thread_.joinable())
return;
active_ = false;
cv_.notify_one();
if (thread_.joinable())
thread_.join();
}
bool PcmStream::sleep(int32_t ms)
{
if (ms < 0)
return true;
std::unique_lock<std::mutex> lck(mtx_);
return (!cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !active_; }));
}
@ -115,7 +130,8 @@ void PcmStream::setState(const ReaderState& newState)
if (newState != state_)
{
state_ = newState;
pcmListener_->onStateChanged(this, newState);
if (pcmListener_)
pcmListener_->onStateChanged(this, newState);
}
}
@ -129,7 +145,8 @@ void PcmStream::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, dou
chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000);
pcmListener_->onChunkRead(this, chunk, duration);
if (pcmListener_)
pcmListener_->onChunkRead(this, chunk, duration);
}
@ -145,7 +162,7 @@ json PcmStream::toJson() const
json j = {
{"uri", uri_.toJson()},
{"id", uri_.id()},
{"id", getId()},
{"status", state}
};
return j;

View file

@ -16,12 +16,14 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PCM_READER_H
#define PCM_READER_H
#ifndef PCM_STREAM_H
#define PCM_STREAM_H
#include <thread>
#include <atomic>
#include <string>
#include <mutex>
#include <condition_variable>
#include <map>
#include "streamUri.h"
#include "encoder/encoder.h"
@ -76,6 +78,7 @@ public:
virtual const StreamUri& getUri() const;
virtual const std::string& getName() const;
virtual const std::string& getId() const;
virtual const SampleFormat& getSampleFormat() const;
virtual ReaderState getState() const;
@ -83,12 +86,16 @@ public:
protected:
std::condition_variable cv_;
std::mutex mtx_;
std::thread thread_;
std::atomic<bool> active_;
virtual void worker() = 0;
virtual bool sleep(int32_t ms);
void setState(const ReaderState& newState);
timeval tvEncodedChunk_;
std::atomic<bool> active_;
std::thread readerThread_;
PcmListener* pcmListener_;
StreamUri uri_;
SampleFormat sampleFormat_;

View file

@ -24,9 +24,9 @@
#include "pipeStream.h"
#include "encoder/encoderFactory.h"
#include "common/log.h"
#include "common/snapException.h"
#include "common/strCompat.h"
#include "common/log.h"
using namespace std;
@ -37,9 +37,7 @@ using namespace std;
PipeStream::PipeStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), fd_(-1)
{
umask(0);
string mode = uri_.query["mode"];
if (mode.empty())
mode = "create";
string mode = uri_.getQuery("mode", "create");
logO << "PipeStream mode: " << mode << "\n";
if ((mode != "read") && (mode != "create"))
@ -90,7 +88,8 @@ void PipeStream::worker()
if (count < 0)
{
setState(kIdle);
usleep(100*1000);
if (!sleep(100))
break;
}
else if (count == 0)
throw SnapException("end of file");
@ -99,16 +98,21 @@ void PipeStream::worker()
}
while ((len < toRead) && active_);
if (!active_) break;
encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
// logO << "sleep: " << nextTick - currentTick << "\n";
setState(kPlaying);
usleep((nextTick - currentTick) * 1000);
if (!sleep(nextTick - currentTick))
break;
}
else
{
@ -121,8 +125,9 @@ void PipeStream::worker()
}
catch(const std::exception& e)
{
logE << "Exception: " << e.what() << std::endl;
usleep(100*1000);
logE << "(PipeStream) Exception: " << e.what() << std::endl;
if (!sleep(100))
break;
}
}
}

View file

@ -37,7 +37,7 @@ public:
virtual ~PipeStream();
protected:
void worker();
virtual void worker();
int fd_;
};

View file

@ -0,0 +1,228 @@
#ifndef TINY_PROCESS_LIBRARY_HPP_
#define TINY_PROCESS_LIBRARY_HPP_
#include <string>
#include <mutex>
#include <sys/wait.h>
#include <cstdlib>
#include <unistd.h>
#include <signal.h>
// Forked from: https://github.com/eidheim/tiny-process-library
// Copyright (c) 2015-2016 Ole Christian Eidheim
// Thanks, Christian :-)
///Create a new process given command and run path.
///Thus, at the moment, if read_stdout==nullptr, read_stderr==nullptr and open_stdin==false,
///the stdout, stderr and stdin are sent to the parent process instead.
///Compile with -DMSYS_PROCESS_USE_SH to run command using "sh -c [command]" on Windows as well.
class Process {
public:
typedef int fd_type;
Process(const std::string &command, const std::string &path = "") : closed(true)
{
open(command, path);
}
~Process()
{
close_fds();
}
///Get the process id of the started process.
pid_t getPid()
{
return pid;
}
///Write to stdin. Convenience function using write(const char *, size_t).
bool write(const std::string &data)
{
return write(data.c_str(), data.size());
}
///Wait until process is finished, and return exit status.
int get_exit_status()
{
if (pid <= 0)
return -1;
int exit_status;
waitpid(pid, &exit_status, 0);
{
std::lock_guard<std::mutex> lock(close_mutex);
closed=true;
}
close_fds();
if (exit_status >= 256)
exit_status = exit_status>>8;
return exit_status;
}
///Write to stdin.
bool write(const char *bytes, size_t n)
{
std::lock_guard<std::mutex> lock(stdin_mutex);
if (::write(stdin_fd, bytes, n)>=0)
return true;
else
return false;
}
///Close stdin. If the process takes parameters from stdin, use this to notify that all parameters have been sent.
void close_stdin()
{
std::lock_guard<std::mutex> lock(stdin_mutex);
if (pid > 0)
close(stdin_fd);
}
///Kill the process.
void kill(bool force=false)
{
std::lock_guard<std::mutex> lock(close_mutex);
if (pid > 0 && !closed)
{
if(force)
::kill(-pid, SIGTERM);
else
::kill(-pid, SIGINT);
}
}
///Kill a given process id. Use kill(bool force) instead if possible.
static void kill(pid_t id, bool force=false)
{
if (id <= 0)
return;
if (force)
::kill(-id, SIGTERM);
else
::kill(-id, SIGINT);
}
fd_type getStdout()
{
return stdout_fd;
}
fd_type getStderr()
{
return stderr_fd;
}
fd_type getStdin()
{
return stdin_fd;
}
private:
pid_t pid;
bool closed;
std::mutex close_mutex;
std::mutex stdin_mutex;
fd_type stdout_fd, stderr_fd, stdin_fd;
void closePipe(int pipefd[2])
{
close(pipefd[0]);
close(pipefd[1]);
}
pid_t open(const std::string &command, const std::string &path)
{
int stdin_p[2], stdout_p[2], stderr_p[2];
if (pipe(stdin_p) != 0)
return -1;
if (pipe(stdout_p) != 0)
{
closePipe(stdin_p);
return -1;
}
if (pipe(stderr_p) != 0)
{
closePipe(stdin_p);
closePipe(stdout_p);
return -1;
}
pid = fork();
if (pid < 0)
{
closePipe(stdin_p);
closePipe(stdout_p);
closePipe(stderr_p);
return pid;
}
else if (pid == 0)
{
dup2(stdin_p[0], 0);
dup2(stdout_p[1], 1);
dup2(stderr_p[1], 2);
closePipe(stdin_p);
closePipe(stdout_p);
closePipe(stderr_p);
//Based on http://stackoverflow.com/a/899533/3808293
int fd_max = sysconf(_SC_OPEN_MAX);
for (int fd=3; fd<fd_max; fd++)
close(fd);
setpgid(0, 0);
if (!path.empty())
{
auto path_escaped = path;
size_t pos=0;
//Based on https://www.reddit.com/r/cpp/comments/3vpjqg/a_new_platform_independent_process_library_for_c11/cxsxyb7
while ((pos = path_escaped.find('\'', pos)) != std::string::npos)
{
path_escaped.replace(pos, 1, "'\\''");
pos += 4;
}
execl("/bin/sh", "sh", "-c", ("cd '" + path_escaped + "' && " + command).c_str(), NULL);
}
else
execl("/bin/sh", "sh", "-c", command.c_str(), NULL);
_exit(EXIT_FAILURE);
}
close(stdin_p[0]);
close(stdout_p[1]);
close(stderr_p[1]);
stdin_fd = stdin_p[1];
stdout_fd = stdout_p[0];
stderr_fd = stderr_p[0];
closed = false;
return pid;
}
void close_fds()
{
close_stdin();
if (pid > 0)
{
close(stdout_fd);
close(stderr_fd);
}
}
};
#endif // TINY_PROCESS_LIBRARY_HPP_

View file

@ -0,0 +1,221 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <sys/stat.h>
#include <limits.h>
#include <fcntl.h>
#include "processStream.h"
#include "common/snapException.h"
#include "common/utils.h"
#include "common/log.h"
using namespace std;
ProcessStream::ProcessStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), path_(""), process_(nullptr)
{
params_ = uri_.getQuery("params");
logStderr_ = (uri_.getQuery("logStderr", "false") == "true");
}
ProcessStream::~ProcessStream()
{
if (process_)
process_->kill();
}
bool ProcessStream::fileExists(const std::string& filename)
{
struct stat buffer;
return (stat(filename.c_str(), &buffer) == 0);
}
std::string ProcessStream::findExe(const std::string& filename)
{
/// check if filename exists
if (fileExists(filename))
return filename;
std::string exe = filename;
if (exe.find("/") != string::npos)
exe = exe.substr(exe.find_last_of("/") + 1);
/// check with "whereis"
string whereis = execGetOutput("whereis " + exe);
if (whereis.find(":") != std::string::npos)
{
whereis = trim_copy(whereis.substr(whereis.find(":") + 1));
if (!whereis.empty())
return whereis;
}
/// check in the same path as this binary
char buff[PATH_MAX];
char szTmp[32];
sprintf(szTmp, "/proc/%d/exe", getpid());
ssize_t len = readlink(szTmp, buff, sizeof(buff)-1);
if (len != -1)
{
buff[len] = '\0';
return string(buff) + "/" + exe;
}
return "";
}
void ProcessStream::initExeAndPath(const std::string& filename)
{
path_ = "";
exe_ = findExe(filename);
if (exe_.find("/") != string::npos)
{
path_ = exe_.substr(0, exe_.find_last_of("/") + 1);
exe_ = exe_.substr(exe_.find_last_of("/") + 1);
}
if (!fileExists(path_ + exe_))
throw SnapException("file not found: \"" + filename + "\"");
}
void ProcessStream::start()
{
initExeAndPath(uri_.path);
PcmStream::start();
}
void ProcessStream::stop()
{
if (process_)
process_->kill();
PcmStream::stop();
/// thread is detached, so it is not joinable
if (stderrReaderThread_.joinable())
stderrReaderThread_.join();
}
void ProcessStream::onStderrMsg(const char* buffer, size_t n)
{
if (logStderr_)
{
string line = trim_copy(string(buffer, n));
if ((line.find('\0') == string::npos) && !line.empty())
logO << "(" << getName() << ") " << line << "\n";
}
}
void ProcessStream::stderrReader()
{
size_t buffer_size = 8192;
auto buffer = std::unique_ptr<char[]>(new char[buffer_size]);
ssize_t n;
stringstream message;
while (active_ && (n=read(process_->getStderr(), buffer.get(), buffer_size)) > 0)
onStderrMsg(buffer.get(), n);
}
void ProcessStream::worker()
{
timeval tvChunk;
std::unique_ptr<msg::PcmChunk> chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_));
setState(kPlaying);
while (active_)
{
process_.reset(new Process(path_ + exe_ + " " + params_, path_));
int flags = fcntl(process_->getStdout(), F_GETFL, 0);
fcntl(process_->getStdout(), F_SETFL, flags | O_NONBLOCK);
stderrReaderThread_ = thread(&ProcessStream::stderrReader, this);
stderrReaderThread_.detach();
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
long nextTick = chronos::getTickCount();
try
{
while (active_)
{
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
int toRead = chunk->payloadSize;
int len = 0;
do
{
int count = read(process_->getStdout(), chunk->payload + len, toRead - len);
if (count < 0)
{
setState(kIdle);
if (!sleep(100))
break;
}
else if (count == 0)
throw SnapException("end of file");
else
len += count;
}
while ((len < toRead) && active_);
if (!active_) break;
encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
setState(kPlaying);
if (!sleep(nextTick - currentTick))
break;
}
else
{
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
}
}
catch(const std::exception& e)
{
logE << "(ProcessStream) Exception: " << e.what() << std::endl;
process_->kill();
if (!sleep(30000))
break;
}
}
}

View file

@ -0,0 +1,63 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PROCESS_STREAM_H
#define PROCESS_STREAM_H
#include <memory>
#include <string>
#include "pcmStream.h"
#include "process.hpp"
/// Starts an external process and reads and PCM data from stdout
/**
* Starts an external process, reads PCM data from stdout, and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class ProcessStream : public PcmStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
ProcessStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~ProcessStream();
virtual void start();
virtual void stop();
protected:
std::string exe_;
std::string path_;
std::string params_;
std::unique_ptr<Process> process_;
std::thread stderrReaderThread_;
bool logStderr_;
virtual void worker();
virtual void stderrReader();
virtual void onStderrMsg(const char* buffer, size_t n);
virtual void initExeAndPath(const std::string& filename);
bool fileExists(const std::string& filename);
std::string findExe(const std::string& filename);
};
#endif

View file

@ -0,0 +1,124 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "spotifyStream.h"
#include "common/snapException.h"
#include "common/utils.h"
#include "common/log.h"
using namespace std;
SpotifyStream::SpotifyStream(PcmListener* pcmListener, const StreamUri& uri) : ProcessStream(pcmListener, uri)
{
sampleFormat_ = SampleFormat("44100:16:2");
uri_.query["sampleformat"] = sampleFormat_.getFormat();
string username = uri_.getQuery("username", "");
string password = uri_.getQuery("password", "");
string bitrate = uri_.getQuery("bitrate", "320");
string devicename = uri_.getQuery("devicename", "Snapcast");
if (username.empty())
throw SnapException("missing parameter \"username\"");
if (password.empty())
throw SnapException("missing parameter \"password\"");
params_ = "--name \"" + devicename + "\" --username \"" + username + "\" --password \"" + password + "\" --bitrate " + bitrate + " --backend stdout";
// logO << "params: " << params << "\n";
}
SpotifyStream::~SpotifyStream()
{
}
void SpotifyStream::initExeAndPath(const std::string& filename)
{
path_ = "";
exe_ = findExe(filename);
if (!fileExists(exe_) || (exe_ == "/"))
{
exe_ = findExe("librespot");
if (!fileExists(exe_))
throw SnapException("librespot not found");
}
if (exe_.find("/") != string::npos)
{
path_ = exe_.substr(0, exe_.find_last_of("/") + 1);
exe_ = exe_.substr(exe_.find_last_of("/") + 1);
}
/// kill if it's already running
execGetOutput("killall " + exe_);
}
void SpotifyStream::onStderrMsg(const char* buffer, size_t n)
{
/// Watch will kill librespot if there was no message received for 130min
// 2016-11-02 22-05-15 [out] TRACE:librespot::stream: allocated stream 3580
// 2016-11-02 22-05-15 [Debug] DEBUG:librespot::audio_file2: Got channel 3580
// 2016-11-02 22-06-39 [out] DEBUG:librespot::spirc: kMessageTypeHello "SM-G901F" 5e1ffdd73f0d1741c4a173d5b238826464ca8e2f 1 0
// 2016-11-02 22-06-39 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 123 1478120652755
// 2016-11-02 22-06-40 [out] DEBUG:librespot::spirc: kMessageTypeNotify "SM-G901F" 5e1ffdd73f0d1741c4a173d5b238826464ca8e2f 1 0
// 2016-11-02 22-06-41 [out] DEBUG:librespot::spirc: kMessageTypePause "SM-G901F" 5e1ffdd73f0d1741c4a173d5b238826464ca8e2f 2 0
// 2016-11-02 22-06-42 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 124 1478120801615
// 2016-11-02 22-06-47 [out] DEBUG:librespot::spirc: kMessageTypeNotify "SM-G901F" 5e1ffdd73f0d1741c4a173d5b238826464ca8e2f 2 1478120801615
// 2016-11-02 22-35-10 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 125 1478120801615
// 2016-11-02 23-36-06 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 126 1478120801615
// 2016-11-03 01-37-08 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 127 1478120801615
// 2016-11-03 02-38-13 [out] DEBUG:librespot::spirc: kMessageTypeNotify "Snapcast" 68724ecccd67781303655c49a73b74c5968667b1 128 1478120801615
// killall librespot
// 2016-11-03 09-00-18 [out] INFO:librespot::main_helper: librespot 6fa4e4d (2016-09-21). Built on 2016-10-27.
// 2016-11-03 09-00-18 [out] INFO:librespot::session: Connecting to AP lon3-accesspoint-a34.ap.spotify.com:443
// 2016-11-03 09-00-18 [out] INFO:librespot::session: Authenticated !
watchdog_->trigger();
string logmsg = trim_copy(string(buffer, n));
if ((logmsg.find("allocated stream") == string::npos) &&
(logmsg.find("Got channel") == string::npos) &&
(logmsg.find('\0') == string::npos) &&
(logmsg.size() > 4))
{
logO << "(" << getName() << ") " << logmsg << "\n";
}
}
void SpotifyStream::stderrReader()
{
watchdog_.reset(new Watchdog(this));
/// 130min
watchdog_->start(130*60*1000);
ProcessStream::stderrReader();
}
void SpotifyStream::onTimeout(const Watchdog* watchdog, size_t ms)
{
logE << "Spotify timeout: " << ms / 1000 << "\n";
if (process_)
process_->kill();
}

View file

@ -0,0 +1,51 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef SPOTIFY_STREAM_H
#define SPOTIFY_STREAM_H
#include "processStream.h"
#include "watchdog.h"
/// Starts librespot and reads PCM data from stdout
/**
* Starts librespot, reads PCM data from stdout, and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
* usage:
* snapserver -s "spotify:///librespot?name=Spotify&username=<my username>&password=<my password>[&devicename=Snapcast][&bitrate=320]"
*/
class SpotifyStream : public ProcessStream, WatchdogListener
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
SpotifyStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~SpotifyStream();
protected:
std::unique_ptr<Watchdog> watchdog_;
virtual void stderrReader();
virtual void onStderrMsg(const char* buffer, size_t n);
virtual void initExeAndPath(const std::string& filename);
virtual void onTimeout(const Watchdog* watchdog, size_t ms);
};
#endif

View file

@ -17,6 +17,9 @@
***/
#include "streamManager.h"
#include "airplayStream.h"
#include "spotifyStream.h"
#include "processStream.h"
#include "pipeStream.h"
#include "fileStream.h"
#include "common/utils.h"
@ -33,7 +36,7 @@ StreamManager::StreamManager(PcmListener* pcmListener, const std::string& defaul
}
PcmStream* StreamManager::addStream(const std::string& uri)
PcmStreamPtr StreamManager::addStream(const std::string& uri)
{
StreamUri streamUri(uri);
@ -51,23 +54,44 @@ PcmStream* StreamManager::addStream(const std::string& uri)
// for (auto kv: streamUri.query)
// logD << "key: '" << kv.first << "' value: '" << kv.second << "'\n";
PcmStreamPtr stream(nullptr);
if (streamUri.scheme == "pipe")
{
streams_.push_back(make_shared<PipeStream>(pcmListener_, streamUri));
return streams_.back().get();
stream = make_shared<PipeStream>(pcmListener_, streamUri);
}
else if (streamUri.scheme == "file")
{
streams_.push_back(make_shared<FileStream>(pcmListener_, streamUri));
return streams_.back().get();
stream = make_shared<FileStream>(pcmListener_, streamUri);
}
else if (streamUri.scheme == "process")
{
stream = make_shared<ProcessStream>(pcmListener_, streamUri);
}
else if (streamUri.scheme == "spotify")
{
stream = make_shared<SpotifyStream>(pcmListener_, streamUri);
}
else if (streamUri.scheme == "airplay")
{
stream = make_shared<AirplayStream>(pcmListener_, streamUri);
}
else
{
throw SnapException("Unknown stream type: " + streamUri.scheme);
}
return NULL;
if (stream)
{
for (auto s: streams_)
{
if (s->getName() == stream->getName())
throw SnapException("Stream with name \"" + stream->getName() + "\" already exists");
}
streams_.push_back(stream);
}
return stream;
}
@ -90,7 +114,7 @@ const PcmStreamPtr StreamManager::getStream(const std::string& id)
{
for (auto stream: streams_)
{
if (stream->getUri().id() == id)
if (stream->getId() == id)
return stream;
}
return nullptr;

View file

@ -13,7 +13,7 @@ class StreamManager
public:
StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20);
PcmStream* addStream(const std::string& uri);
PcmStreamPtr addStream(const std::string& uri);
void start();
void stop();
const std::vector<PcmStreamPtr>& getStreams();

View file

@ -41,15 +41,6 @@ StreamUri::StreamUri(const std::string& streamUri)
string decodedUri = uriDecode(uri);
logD << "StreamUri: " << decodedUri << "\n";
id_ = decodedUri;
pos = id_.find('?');
if (pos != string::npos)
id_ = id_.substr(0, pos);
pos = id_.find('#');
if (pos != string::npos)
id_ = id_.substr(0, pos);
logD << "id: '" << id_ << "'\n";
string tmp(decodedUri);
pos = tmp.find(':');
@ -98,8 +89,6 @@ StreamUri::StreamUri(const std::string& streamUri)
string key = trim_copy(kv.substr(0, pos));
string value = trim_copy(kv.substr(pos+1));
query[key] = value;
if (key == "id")
id_ = value;
}
}
}
@ -119,7 +108,10 @@ json StreamUri::toJson() const
}
std::string StreamUri::id() const
std::string StreamUri::getQuery(const std::string& key, const std::string& def) const
{
return id_;
auto iter = query.find(key);
if (iter != query.end())
return iter->second;
return def;
}

View file

@ -26,6 +26,7 @@
using json = nlohmann::json;
// scheme:[//[user:password@]host[:port]][/]path[?query][#fragment]
struct StreamUri
{
StreamUri(const std::string& uri);
@ -47,9 +48,7 @@ struct StreamUri
std::string id() const;
json toJson() const;
private:
std::string id_;
std::string getQuery(const std::string& key, const std::string& def = "") const;
};

View file

@ -0,0 +1,83 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "watchdog.h"
#include <chrono>
using namespace std;
Watchdog::Watchdog(WatchdogListener* listener) : listener_(listener), thread_(nullptr), active_(false)
{
}
Watchdog::~Watchdog()
{
stop();
}
void Watchdog::start(size_t timeoutMs)
{
timeoutMs_ = timeoutMs;
if (!thread_ || !active_)
{
active_ = true;
thread_.reset(new thread(&Watchdog::worker, this));
}
else
trigger();
}
void Watchdog::stop()
{
active_ = false;
trigger();
if (thread_ && thread_->joinable())
thread_->join();
thread_ = nullptr;
}
void Watchdog::trigger()
{
// std::unique_lock<std::mutex> lck(mtx_);
cv_.notify_one();
}
void Watchdog::worker()
{
while (active_)
{
std::unique_lock<std::mutex> lck(mtx_);
if (cv_.wait_for(lck, std::chrono::milliseconds(timeoutMs_)) == std::cv_status::timeout)
{
if (listener_)
{
listener_->onTimeout(this, timeoutMs_);
break;
}
}
}
active_ = false;
}

View file

@ -0,0 +1,63 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef WATCH_DOG_H
#define WATCH_DOG_H
#include <mutex>
#include <thread>
#include <memory>
#include <atomic>
#include <condition_variable>
class Watchdog;
class WatchdogListener
{
public:
virtual void onTimeout(const Watchdog* watchdog, size_t ms) = 0;
};
/// Watchdog
class Watchdog
{
public:
Watchdog(WatchdogListener* listener = nullptr);
virtual ~Watchdog();
void start(size_t timeoutMs);
void stop();
void trigger();
private:
WatchdogListener* listener_;
std::condition_variable cv_;
std::mutex mtx_;
std::unique_ptr<std::thread> thread_;
size_t timeoutMs_;
std::atomic<bool> active_;
void worker();
};
#endif