diff --git a/shadowsocks_manager/shadowsocks/models.py b/shadowsocks_manager/shadowsocks/models.py index 5e95094..f91e544 100644 --- a/shadowsocks_manager/shadowsocks/models.py +++ b/shadowsocks_manager/shadowsocks/models.py @@ -405,17 +405,34 @@ def change_ips_softly(cls): Updating DNS record on the fly is depends on the NameServer API which you have to set first in the domain app. This is a feature of [aws-cfn-vpn](https://github.com/alexzhangs/aws-cfn-vpn). """ - for node in cls.objects.filter( + for pk in cls.objects.filter( is_active=True, sns_endpoint__isnull=False, sns_access_key__isnull=False, sns_secret_key__isnull=False, - ): - node.toggle_active() # make the node inactive - time.sleep(300) # assuming your DNS record TTL is 300 seconds - node.change_ip() - time.sleep(60) # AWS config capture takes time - node.toggle_active() # make the node active again + ).values_list('pk', flat=True): + # Re-load each iteration so concurrent admin edits (e.g. cleared + # credentials, manual is_active toggle) aren't silently overwritten + # by stale in-memory state held across the 6-minute sleep. + node = cls.objects.get(pk=pk) + if not (node.is_active and node.sns_access_key and node.sns_secret_key): + continue + node.is_active = False + node.save() + try: + time.sleep(300) # wait for DNS TTL + node.refresh_from_db() + if not (node.sns_access_key and node.sns_secret_key): + continue + node.change_ip() + time.sleep(60) # AWS config capture + finally: + # Restore is_active regardless of what raised mid-cycle — + # otherwise an exception leaves the node stuck inactive and + # the next run sees it as ineligible. + node.refresh_from_db() + node.is_active = True + node.save() class NodeAccount(StatisticMethod): @@ -1016,8 +1033,60 @@ def update_by_account(sender, instance, **kwargs): instance.on_update() +def _slack_webhook_url(): + """Return the Slack incoming-webhook URL, or '' if not configured. + + Prefers settings.SSM_SLACK_WEBHOOK_URL (from the SSM_SLACK_WEBHOOK_URL env + var); falls back to the file $SSM_DATA_HOME/.slack-webhook so a URL can be + dropped onto a running container without a restart. + """ + from django.conf import settings + import os + url = (getattr(settings, 'SSM_SLACK_WEBHOOK_URL', '') or '').strip() + if url: + return url + try: + path = os.path.join(os.getenv('SSM_DATA_HOME', '/var/local/ssm'), '.slack-webhook') + with open(path) as f: + return f.read().strip() + except Exception: + return '' + + +def _notify_ip_change(node, old_ip, new_ip): + """Best-effort Slack notification when a node's public IP changes (rotation). + + Fires for any public_ip change, scheduled or not, so an unexpected rotation + is surfaced immediately. Never raises: a notification failure must not break + Node.save(). + """ + url = _slack_webhook_url() + if not url: + return + try: + import json + import urllib.request + rec = getattr(node, 'record', None) + fqdn = ' (%s)' % rec.fqdn if rec is not None and getattr(rec, 'fqdn', None) else '' + text = ':rotating_light: VPN node *%s* IP changed: `%s` -> `%s`%s' % ( + node.name, old_ip, new_ip, fqdn) + req = urllib.request.Request( + url, data=json.dumps({'text': text}).encode('utf-8'), + headers={'Content-Type': 'application/json'}, method='POST') + urllib.request.urlopen(req, timeout=5).read() + except Exception as e: + logger.warning('slack ip-change notify failed for %s: %s', node.name, e) + + @receiver(post_save, sender=Node) -def update_by_node(sender, instance, **kwargs): +def update_by_node(sender, instance, created=False, **kwargs): + # Notify on a real public-IP change (rotation). The post-save snapshot in + # Node.save() runs AFTER this receiver, so _original_public_ip still holds + # the prior IP here and can be compared against the just-saved value. + old_ip = getattr(instance, '_original_public_ip', None) + new_ip = instance.public_ip + if not created and old_ip and new_ip and old_ip != new_ip: + _notify_ip_change(instance, old_ip, new_ip) instance.on_update() diff --git a/shadowsocks_manager/shadowsocks/tasks.py b/shadowsocks_manager/shadowsocks/tasks.py index f719359..d4b148e 100644 --- a/shadowsocks_manager/shadowsocks/tasks.py +++ b/shadowsocks_manager/shadowsocks/tasks.py @@ -16,6 +16,16 @@ def port_heartbeat(): def node_change_ips(): return Node.change_ips() -@shared_task +# IMPORTANT: this task must NOT use acks_late / reject_on_worker_lost. +# change_ips_softly blocks the worker for ~6 min per node (DNS TTL + Config +# wait). With late acks the message stays unacked for the whole run; the +# blocking sleep also lapses the AMQP heartbeat (see CELERY_BROKER_HEARTBEAT +# in settings), so the broker drops the connection and REDELIVERS the still- +# unacked message — turning one scheduled fire into an endless ~6-min rotation +# loop (the 2026 IP-rotation incident; observed redelivered=N/N in the queue). +# Early ack (the default) means the message is acked at delivery: a worker +# dying mid-run loses only that single run — recovered by the next schedule and +# the is_active finally-guard in change_ips_softly — instead of looping forever. +@shared_task(acks_late=False) def node_change_ips_softly(): return Node.change_ips_softly() diff --git a/shadowsocks_manager/shadowsocks/tests.py b/shadowsocks_manager/shadowsocks/tests.py index 19d56c8..9ad2a95 100644 --- a/shadowsocks_manager/shadowsocks/tests.py +++ b/shadowsocks_manager/shadowsocks/tests.py @@ -9,7 +9,8 @@ import time import botocore from abc import abstractmethod -from django.test import TestCase +from unittest import mock +from django.test import TestCase, override_settings from django.core.exceptions import ValidationError from django.core.management import call_command @@ -816,3 +817,143 @@ def test_node_no_change_no_cascade(self): na.refresh_from_db() self.assertTrue(na.is_active, 'a save that does not flip is_active should leave NodeAccount untouched') + + +class NodeChangeIpsSoftlyTestCase(AppTestCase): + """Resilience tests for Node.change_ips_softly(). + + The soft-rotation loop inactivates a node, waits for the DNS TTL, + triggers the IP change, then reactivates it. The hardening guards two + failure modes seen in the IP-rotation incident: + + 1. If change_ip() (or anything in the cycle) raises, a finally block + must restore is_active=True so the node isn't left permanently + inactive — which also drops it from the next run's eligibility + filter, compounding the outage. + 2. Each node is re-loaded and re-checked inside the loop, so a node + whose SNS credentials are blank is skipped instead of having a + rotation triggered against stale state. + """ + + def _make_node(self, name='soft-rotate-node', **kwargs): + defaults = dict( + public_ip='192.0.2.10', # TEST-NET-1, RFC 5737 + private_ip='10.255.255.254', + location='Unit Test', + sns_endpoint='arn:aws:sns:ap-northeast-1:0:topic', + sns_access_key='mock', + sns_secret_key='mock', + is_active=True, + ) + defaults.update(kwargs) + return models.Node.objects.create(name=name, **defaults) + + def test_restores_is_active_when_change_ip_raises(self): + """change_ip() failing mid-cycle must leave the node ACTIVE. + + This is the core regression: pre-fix, an exception after the node + was inactivated left is_active=False permanently. + """ + node = self._make_node() + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip', + side_effect=RuntimeError('boom')) as m_change_ip: + # The exception propagates out of the loop, but the finally block + # must already have restored is_active before it does. + with self.assertRaises(RuntimeError): + models.Node.change_ips_softly() + m_change_ip.assert_called_once() + node.refresh_from_db() + self.assertTrue(node.is_active, + 'a change_ip() failure must not leave the node stuck inactive') + + def test_happy_path_keeps_node_active(self): + """On success the node is rotated once and left active.""" + node = self._make_node() + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip') as m_change_ip: + models.Node.change_ips_softly() + m_change_ip.assert_called_once() + node.refresh_from_db() + self.assertTrue(node.is_active) + + def test_skips_node_with_blank_credentials(self): + """A node whose SNS credentials are blank is selected by the + not-null filter but skipped by the in-loop guard — change_ip is + never called and the node is left untouched (active).""" + node = self._make_node(name='blank-creds', + sns_access_key='', sns_secret_key='') + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip') as m_change_ip: + models.Node.change_ips_softly() + m_change_ip.assert_not_called() + node.refresh_from_db() + self.assertTrue(node.is_active) + + +class NodeIpChangeSlackNotifyTestCase(AppTestCase): + """A Node.public_ip change emits a best-effort Slack notification.""" + + def _node(self, name='slack-node', **kwargs): + defaults = dict(public_ip='192.0.2.10', private_ip='10.255.255.254', + location='Unit Test', is_active=True) + defaults.update(kwargs) + return models.Node.objects.create(name=name, **defaults) + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_notifies_on_public_ip_change(self): + node = self._node() + with mock.patch('urllib.request.urlopen') as m_open: + node.public_ip = '192.0.2.20' + node.save() + self.assertEqual(m_open.call_count, 1) + body = m_open.call_args[0][0].data.decode('utf-8') + self.assertIn('192.0.2.10', body) # old IP present + self.assertIn('192.0.2.20', body) # new IP present + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_no_notify_when_ip_unchanged(self): + node = self._node() + with mock.patch('urllib.request.urlopen') as m_open: + node.location = 'somewhere else' # save without touching public_ip + node.save() + m_open.assert_not_called() + + def test_no_notify_when_webhook_unconfigured(self): + node = self._node() + with mock.patch('shadowsocks.models._slack_webhook_url', return_value=''), \ + mock.patch('urllib.request.urlopen') as m_open: + node.public_ip = '192.0.2.30' + node.save() + m_open.assert_not_called() + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_notify_failure_does_not_break_save(self): + node = self._node() + with mock.patch('urllib.request.urlopen', side_effect=OSError('network down')): + node.public_ip = '192.0.2.40' + node.save() # must not raise + node.refresh_from_db() + self.assertEqual(node.public_ip, '192.0.2.40') + + +class CeleryRotationTaskConfigTestCase(TestCase): + """Guard the broker/task settings that prevent the redelivery loop. + + change_ips_softly blocks the worker ~6 min; with late acks or a live AMQP + heartbeat the broker drops the connection mid-run and redelivers the message, + looping forever (the 2026 IP-rotation incident). + """ + + def test_change_ips_softly_uses_early_ack(self): + from shadowsocks.tasks import node_change_ips_softly + self.assertFalse(node_change_ips_softly.acks_late, + 'node_change_ips_softly must early-ack: late-ack on a ~6 min blocking ' + 'task causes infinite broker redelivery') + + def test_broker_heartbeat_disabled_and_single_prefetch(self): + from django.conf import settings + self.assertEqual(settings.CELERY_BROKER_HEARTBEAT, 0, + 'a non-zero heartbeat lapses during the long blocking task and triggers ' + 'connection-drop redelivery') + self.assertEqual(settings.CELERY_WORKER_PREFETCH_MULTIPLIER, 1) diff --git a/shadowsocks_manager/shadowsocks_manager/settings.py b/shadowsocks_manager/shadowsocks_manager/settings.py index 1e3b3f3..307381b 100644 --- a/shadowsocks_manager/shadowsocks_manager/settings.py +++ b/shadowsocks_manager/shadowsocks_manager/settings.py @@ -340,3 +340,20 @@ def __repr__(self): CELERY_CACHE_BACKEND = 'django-cache' CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' CELERY_BROKER_URL = 'amqp://guest:guest@{}:{}//'.format(RABBITMQ_HOST, RABBITMQ_PORT) + +# shadowsocks.tasks.node_change_ips_softly blocks the worker ~6 min per node +# (5 min DNS TTL + 1 min AWS Config wait). The default 120s AMQP heartbeat would +# lapse during that blocking sleep, so the broker drops the connection and +# REDELIVERS the in-flight message — turning one scheduled rotation into an +# endless ~6-min loop (the 2026 IP-rotation incident). Disable the heartbeat so a +# long blocking task can't kill the connection, and prefetch one task at a time so +# there are no prefetched siblings to requeue either. +# (Proper long-term fix: make change_ips_softly non-blocking via chained tasks +# with apply_async(countdown=...). Tracked as a follow-up.) +CELERY_BROKER_HEARTBEAT = 0 +CELERY_WORKER_PREFETCH_MULTIPLIER = 1 + +# Optional Slack incoming-webhook URL for IP-rotation alerts (empty => disabled). +# Falls back to the file $SSM_DATA_HOME/.slack-webhook when this env var is unset, +# so a URL can be dropped onto a running container without a restart. +SSM_SLACK_WEBHOOK_URL = config('SSM_SLACK_WEBHOOK_URL', default='')