21 #ifdef USE_DASHBOARD_IMPORT 28 static const char *
const TAG =
"mqtt";
37 ESP_LOGCONFIG(TAG,
"Setting up MQTT...");
39 [
this](
const char *topic,
const char *payload,
size_t len,
size_t index,
size_t total) {
47 if (len + index == total) {
71 "esphome/discover", [
this](
const std::string &topic,
const std::string &payload) { this->
send_device_info_(); },
74 std::string topic =
"esphome/ping/";
77 topic, [
this](
const std::string &topic,
const std::string &payload) { this->
send_device_info_(); }, 2);
88 std::string topic =
"esphome/discover/";
108 root[
"version"] = ESPHOME_VERSION;
112 root[
"platform"] =
"ESP8266";
115 root[
"platform"] =
"ESP32";
118 root[
"platform"] = lt_cpu_get_model_name();
121 root[
"board"] = ESPHOME_BOARD;
122 #if defined(USE_WIFI) 123 root[
"network"] =
"wifi";
124 #elif defined(USE_ETHERNET) 125 root[
"network"] =
"ethernet";
128 #ifdef ESPHOME_PROJECT_NAME 129 root[
"project_name"] = ESPHOME_PROJECT_NAME;
130 root[
"project_version"] = ESPHOME_PROJECT_VERSION;
131 #endif // ESPHOME_PROJECT_NAME 133 #ifdef USE_DASHBOARD_IMPORT 138 root[
"api_encryption"] =
"Noise_NNpsk0_25519_ChaChaPoly_SHA256";
145 ESP_LOGCONFIG(TAG,
"MQTT:");
151 ESP_LOGCONFIG(TAG,
" Discovery IP enabled");
157 ESP_LOGCONFIG(TAG,
" Topic Prefix: '%s'", this->
topic_prefix_.c_str());
169 subscription.subscribed =
false;
170 subscription.resubscribe_timeout = 0;
188 this->
ip_ = network::IPAddress(&addr);
192 case ERR_INPROGRESS: {
194 ESP_LOGD(TAG,
"Resolving MQTT broker IP address...");
200 ESP_LOGW(TAG,
"Error resolving MQTT broker IP address: %d", err);
214 ESP_LOGW(TAG,
"Couldn't resolve IP address for '%s'!", this->
credentials_.
address.c_str());
223 ESP_LOGD(TAG,
"Resolved broker IP address to %s", this->
ip_.
str().c_str());
226 #if defined(USE_ESP8266) && LWIP_VERSION_MAJOR == 1 232 if (ipaddr ==
nullptr) {
233 a_this->dns_resolve_error_ =
true;
235 a_this->ip_ = network::IPAddress(ipaddr);
236 a_this->dns_resolved_ =
true;
244 ESP_LOGI(TAG,
"Connecting to MQTT...");
249 const char *username =
nullptr;
252 const char *password =
nullptr;
284 ESP_LOGI(TAG,
"MQTT Connected!");
291 for (MQTTComponent *component : this->
children_)
292 component->schedule_resend_state();
300 const LogString *reason_s;
303 reason_s = LOG_STR(
"TCP disconnected");
306 reason_s = LOG_STR(
"Unacceptable Protocol Version");
309 reason_s = LOG_STR(
"Identifier Rejected");
312 reason_s = LOG_STR(
"Server Unavailable");
315 reason_s = LOG_STR(
"Malformed Credentials");
318 reason_s = LOG_STR(
"Not Authorized");
321 reason_s = LOG_STR(
"Not Enough Space");
324 reason_s = LOG_STR(
"TLS Bad Fingerprint");
327 reason_s = LOG_STR(
"Unknown");
331 reason_s = LOG_STR(
"WiFi disconnected");
333 ESP_LOGW(TAG,
"MQTT Disconnected: %s.", LOG_STR_ARG(reason_s));
337 const uint32_t now =
millis();
354 ESP_LOGW(TAG,
"Lost MQTT Client connection!");
368 ESP_LOGE(TAG,
"Can't connect to MQTT... Restarting...");
383 ESP_LOGV(TAG,
"subscribe(topic='%s')", topic);
386 ESP_LOGV(TAG,
"Subscribe failed for topic='%s'. Will retry later.", topic);
395 const uint32_t now =
millis();
396 bool do_resub = sub->resubscribe_timeout == 0 || now - sub->resubscribe_timeout > 1000;
399 sub->subscribed = this->
subscribe_(sub->topic.c_str(), sub->qos);
400 sub->resubscribe_timeout = now;
404 for (
auto &subscription : this->subscriptions_) {
410 MQTTSubscription subscription{
413 .callback = std::move(callback),
415 .resubscribe_timeout = 0,
418 this->subscriptions_.push_back(subscription);
422 auto f = [callback](
const std::string &topic,
const std::string &payload) {
424 callback(topic, root);
428 MQTTSubscription subscription{
433 .resubscribe_timeout = 0,
436 this->subscriptions_.push_back(subscription);
443 ESP_LOGV(TAG,
"unsubscribe(topic='%s')", topic.c_str());
446 ESP_LOGV(TAG,
"Unsubscribe failed for topic='%s'.", topic.c_str());
450 auto it = subscriptions_.begin();
451 while (it != subscriptions_.end()) {
452 if (it->topic == topic) {
453 it = subscriptions_.erase(it);
462 return this->
publish(topic, payload.data(), payload.size(), qos, retain);
467 return publish({.topic = topic, .payload = payload, .qos = qos, .retain = retain});
484 if (!logging_topic) {
486 ESP_LOGV(TAG,
"Publish(topic='%s' payload='%s' retain=%d qos=%d)", message.topic.c_str(), message.payload.c_str(),
487 message.retain, message.qos);
489 ESP_LOGV(TAG,
"Publish failed for topic='%s' (len=%u). will retry later..", message.topic.c_str(),
490 message.payload.length());
499 return this->
publish(topic, message, qos, retain);
513 static bool topic_match(
const char *message,
const char *subscription,
bool is_normal,
bool past_separator) {
515 if (*message ==
'\0' && *subscription ==
'\0')
519 if (*message ==
'\0' || *subscription ==
'\0')
522 bool do_wildcards = is_normal || past_separator;
524 if (*subscription ==
'+' && do_wildcards) {
529 while (*message !=
'\0' && *message !=
'/') {
534 return topic_match(message, subscription, is_normal,
true);
537 if (*subscription ==
'#' && do_wildcards) {
543 if (*message != *subscription)
546 past_separator = past_separator || *subscription ==
'/';
552 return topic_match(message, subscription, is_normal, past_separator);
555 static bool topic_match(
const char *message,
const char *subscription) {
556 return topic_match(message, subscription, *message !=
'\0' && *message !=
'$',
false);
563 this->
defer([
this, topic, payload]() {
565 for (
auto &subscription : this->subscriptions_) {
566 if (topic_match(topic.c_str(), subscription.topic.c_str()))
567 subscription.callback(topic, payload);
620 bool discover_ip,
bool clean) {
635 .discover_ip =
false,
658 #if ASYNC_TCP_SSL_ENABLED 661 this->
mqtt_backend_.addServerFingerprint(fingerprint.data());
672 global_mqtt_client->subscribe(
674 [
this](
const std::string &topic,
const std::string &payload) {
675 if (this->payload_.has_value() && payload != *this->payload_) {
679 this->trigger(payload);
684 ESP_LOGCONFIG(TAG,
"MQTT Message Trigger:");
685 ESP_LOGCONFIG(TAG,
" Topic: '%s'", this->topic_.c_str());
686 ESP_LOGCONFIG(TAG,
" QoS: %u", this->
qos_);
const Availability & get_availability()
bool is_discovery_enabled() const
void add_on_log_callback(std::function< void(int, const char *, const char *)> &&callback)
Register a callback that will be called for every log message sent.
void set_on_disconnect(std::function< on_disconnect_callback_t > &&callback) final
void disable_last_will()
Remove the last will testament message.
const float AFTER_CONNECTION
For components that should be initialized after a data connection (API/MQTT) is connected.
void dump_config() override
std::string topic
Empty means disabled.
const float AFTER_WIFI
For components that should be initialized after WiFi is connected.
MQTTDiscoveryUniqueIdGenerator unique_id_generator
void status_set_warning(const char *message="unspecified")
std::function< void(const std::string &, const std::string &)> mqtt_callback_t
Callback for MQTT subscriptions.
std::string client_id
The client ID. Will automatically be truncated to 23 characters.
bool unsubscribe(const char *topic) final
MQTTDiscoveryInfo discovery_info_
The discovery info options for Home Assistant.
void status_momentary_warning(const std::string &name, uint32_t length=5000)
void set_server(network::IPAddress ip, uint16_t port) final
std::vector< MQTTComponent * > children_
bool parse_json(const std::string &data, const json_parse_t &f)
Parse a JSON string and run the provided json parse function if it's valid.
void set_client_id(const char *client_id) final
void set_qos(uint8_t qos)
void defer(const std::string &name, std::function< void()> &&f)
Defer a callback to the next loop() call.
const std::string & get_friendly_name() const
Get the friendly name of this Application set by pre_setup().
MQTTMessage last_will_
The last will message.
void set_topic_prefix(const std::string &topic_prefix)
Set the topic prefix that will be prepended to all topics together with "/".
void resubscribe_subscriptions_()
bool discover_ip
Enable the Home Assistant device discovery.
std::string payload_available
std::string prefix
The Home Assistant discovery prefix. Empty means disabled.
void unsubscribe(const std::string &topic)
Unsubscribe from an MQTT topic.
void recalculate_availability_()
Re-calculate the availability property.
bool subscribe_(const char *topic, uint8_t qos)
const std::string & get_topic_prefix() const
Get the topic prefix of this device, using default if necessary.
std::function< MQTTBackend::on_connect_callback_t > mqtt_on_connect_callback_t
Callback for MQTT events.
void dump_config() override
bool is_connected()
Return whether the node is connected to the network (through wifi, eth, ...)
uint32_t IRAM_ATTR HOT millis()
void set_on_connect(mqtt_on_connect_callback_t &&callback)
MQTTBackendESP32 mqtt_backend_
void set_log_message_template(MQTTMessage &&message)
Manually set the topic used for logging.
bool connected() const final
float get_setup_priority() const override
void set_reboot_timeout(uint32_t reboot_timeout)
void set_credentials(const char *username, const char *password) final
void resubscribe_subscription_(MQTTSubscription *sub)
MQTTClientComponent * global_mqtt_client
void set_keep_alive(uint16_t keep_alive) final
network::IPAddresses get_ip_addresses()
void set_payload(const std::string &payload)
void disable_shutdown_message()
void register_mqtt_component(MQTTComponent *component)
void loop() override
Reconnect if required.
uint16_t port
The port number of the server.
bool publish(const MQTTMessage &message)
Publish a MQTTMessage.
void status_clear_warning()
void set_will(const char *topic, uint8_t qos, bool retain, const char *payload) final
MQTTDiscoveryUniqueIdGenerator
available discovery unique_id generators
std::string get_mac_address()
Get the device MAC address as a string, in lowercase hex notation.
bool is_log_message_enabled() const
static void dns_found_callback(const char *name, ip_addr_t *ipaddr, void *callback_arg)
void disable_birth_message()
Remove the birth message.
std::function< void(JsonObject)> json_build_t
Callback function typedef for building JsonObjects.
Application App
Global storage of Application pointer - only one Application can exist.
std::string get_package_import_url()
MQTTMessageTrigger(std::string topic)
void on_message(const std::string &topic, const std::string &payload)
bool subscribe(const char *topic, uint8_t qos) final
MQTTDiscoveryObjectIdGenerator
available discovery object_id generators
std::string build_json(const json_build_t &f)
Build a JSON string with the provided json build function.
void set_discovery_info(std::string &&prefix, MQTTDiscoveryUniqueIdGenerator unique_id_generator, MQTTDiscoveryObjectIdGenerator object_id_generator, bool retain, bool discover_ip, bool clean=false)
Set the Home Assistant discovery info.
void setup() override
Setup the MQTT client, registering a bunch of callbacks and attempting to connect.
const std::string & get_name() const
Get the name of this Application set by pre_setup().
void set_on_message(std::function< on_message_callback_t > &&callback) final
bool publish_json(const std::string &topic, const json::json_build_t &f, uint8_t qos=0, bool retain=false)
Construct and send a JSON MQTT message.
void disable_log_message()
Get the topic used for logging. Defaults to "<topic_prefix>/debug" and the value is cached for speed...
const MQTTDiscoveryInfo & get_discovery_info() const
Get Home Assistant discovery info.
MQTTClientDisconnectReason
std::string address
The address of the server without port number.
bool publish(const char *topic, const char *payload, size_t length, uint8_t qos, bool retain) final
float get_setup_priority() const override
MQTT client setup priority.
void set_log_level(int level)
bool is_disabled()
Return whether the network is disabled (only wifi for now)
std::string to_string(int value)
void set_last_will(MQTTMessage &&message)
Set the last will testament message.
MQTTCredentials credentials_
void IRAM_ATTR HOT yield()
MQTTMessage shutdown_message_
void set_keep_alive(uint16_t keep_alive_s)
Set the keep alive time in seconds, every 0.7*keep_alive a ping will be sent.
std::function< MQTTBackend::on_disconnect_callback_t > mqtt_on_disconnect_callback_t
void start_connect_()
Reconnect to the MQTT broker if not already connected.
MQTTMessage birth_message_
The birth message (e.g.
std::string topic_prefix_
bool can_proceed() override
void set_shutdown_message(MQTTMessage &&message)
Implementation of SPI Controller mode.
std::string payload_not_available
std::vector< MQTTSubscription > subscriptions_
MQTTDiscoveryObjectIdGenerator object_id_generator
void disable_discovery()
Globally disable Home Assistant discovery.
void subscribe_json(const std::string &topic, const mqtt_json_callback_t &callback, uint8_t qos=0)
Subscribe to a MQTT topic and automatically parse JSON payload.
bool is_discovery_ip_enabled() const
uint8_t qos
QoS. Only for last will testaments.
void add_ssl_fingerprint(const std::array< uint8_t, SHA1_SIZE > &fingerprint)
Add a SSL fingerprint to use for TCP SSL connections to the MQTT broker.
std::string payload_buffer_
void on_shutdown() override
optional< MQTTClientDisconnectReason > disconnect_reason_
uint16_t get_port() const
void set_on_disconnect(mqtt_on_disconnect_callback_t &&callback)
APIServer * global_api_server
void subscribe(const std::string &topic, mqtt_callback_t callback, uint8_t qos=0)
Subscribe to an MQTT topic and call callback when a message is received.
Availability availability_
Caches availability.
void set_birth_message(MQTTMessage &&message)
Set the birth message.
void set_on_connect(std::function< on_connect_callback_t > &&callback) final
bool retain
Whether to retain discovery messages.
void IRAM_ATTR HOT delay(uint32_t ms)
std::function< void(const std::string &, JsonObject)> mqtt_json_callback_t