From 491b84599352f4f474f55cba86995eee9c179651 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 11 Jun 2026 02:17:32 +0800 Subject: [PATCH 1/2] fix(shadowsocks): make change_ips_softly crash-safe and reload each node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The soft IP-rotation loop inactivated a node, slept ~5 min, called change_ip(), slept ~1 min, then reactivated it. If anything between the two toggles raised (the IP-rotation incident), the reactivation was skipped and the node was left permanently inactive — which also dropped it from the next run's eligibility filter, compounding the outage. The loop also held a single in-memory Node across the ~6 min of sleeps, so a concurrent admin edit (cleared credentials, manual toggle) was silently clobbered on write-back. - Iterate PKs and reload each node inside the loop; re-check eligibility after the post-TTL sleep and skip if credentials were cleared. - Wrap the change in try/finally so is_active is always restored, even when change_ip() raises. - celery: mark node_change_ips_softly acks_late + reject_on_worker_lost so a worker killed mid-rotation requeues the task instead of dropping it (the `pkill -9 celery` recovery path from the incident). - Add NodeChangeIpsSoftlyTestCase: change_ip-raises restores is_active, happy path stays active, blank-credential node is skipped. Verified against develop's Django 3.2 tox env (8/8 incl. existing snapshot regression tests pass). Co-Authored-By: Claude Opus 4.8 --- shadowsocks_manager/shadowsocks/models.py | 31 +++++++--- shadowsocks_manager/shadowsocks/tasks.py | 7 ++- shadowsocks_manager/shadowsocks/tests.py | 73 +++++++++++++++++++++++ 3 files changed, 103 insertions(+), 8 deletions(-) diff --git a/shadowsocks_manager/shadowsocks/models.py b/shadowsocks_manager/shadowsocks/models.py index 5e95094..0aaeaf7 100644 --- a/shadowsocks_manager/shadowsocks/models.py +++ b/shadowsocks_manager/shadowsocks/models.py @@ -405,17 +405,34 @@ def change_ips_softly(cls): Updating DNS record on the fly is depends on the NameServer API which you have to set first in the domain app. This is a feature of [aws-cfn-vpn](https://github.com/alexzhangs/aws-cfn-vpn). """ - for node in cls.objects.filter( + for pk in cls.objects.filter( is_active=True, sns_endpoint__isnull=False, sns_access_key__isnull=False, sns_secret_key__isnull=False, - ): - node.toggle_active() # make the node inactive - time.sleep(300) # assuming your DNS record TTL is 300 seconds - node.change_ip() - time.sleep(60) # AWS config capture takes time - node.toggle_active() # make the node active again + ).values_list('pk', flat=True): + # Re-load each iteration so concurrent admin edits (e.g. cleared + # credentials, manual is_active toggle) aren't silently overwritten + # by stale in-memory state held across the 6-minute sleep. + node = cls.objects.get(pk=pk) + if not (node.is_active and node.sns_access_key and node.sns_secret_key): + continue + node.is_active = False + node.save() + try: + time.sleep(300) # wait for DNS TTL + node.refresh_from_db() + if not (node.sns_access_key and node.sns_secret_key): + continue + node.change_ip() + time.sleep(60) # AWS config capture + finally: + # Restore is_active regardless of what raised mid-cycle — + # otherwise an exception leaves the node stuck inactive and + # the next run sees it as ineligible. + node.refresh_from_db() + node.is_active = True + node.save() class NodeAccount(StatisticMethod): diff --git a/shadowsocks_manager/shadowsocks/tasks.py b/shadowsocks_manager/shadowsocks/tasks.py index f719359..eb897df 100644 --- a/shadowsocks_manager/shadowsocks/tasks.py +++ b/shadowsocks_manager/shadowsocks/tasks.py @@ -16,6 +16,11 @@ def port_heartbeat(): def node_change_ips(): return Node.change_ips() -@shared_task +# acks_late + reject_on_worker_lost: change_ips_softly runs for ~10 min per +# node and was the task in flight when a celery worker hung in D-state during +# the IP-rotation incident. With late acks the broker only drops the message +# after the task finishes, and reject_on_worker_lost requeues it (instead of +# silently discarding) if the worker is killed (e.g. `pkill -9 celery`) mid-run. +@shared_task(acks_late=True, reject_on_worker_lost=True) def node_change_ips_softly(): return Node.change_ips_softly() diff --git a/shadowsocks_manager/shadowsocks/tests.py b/shadowsocks_manager/shadowsocks/tests.py index 19d56c8..3241483 100644 --- a/shadowsocks_manager/shadowsocks/tests.py +++ b/shadowsocks_manager/shadowsocks/tests.py @@ -9,6 +9,7 @@ import time import botocore from abc import abstractmethod +from unittest import mock from django.test import TestCase from django.core.exceptions import ValidationError from django.core.management import call_command @@ -816,3 +817,75 @@ def test_node_no_change_no_cascade(self): na.refresh_from_db() self.assertTrue(na.is_active, 'a save that does not flip is_active should leave NodeAccount untouched') + + +class NodeChangeIpsSoftlyTestCase(AppTestCase): + """Resilience tests for Node.change_ips_softly(). + + The soft-rotation loop inactivates a node, waits for the DNS TTL, + triggers the IP change, then reactivates it. The hardening guards two + failure modes seen in the IP-rotation incident: + + 1. If change_ip() (or anything in the cycle) raises, a finally block + must restore is_active=True so the node isn't left permanently + inactive — which also drops it from the next run's eligibility + filter, compounding the outage. + 2. Each node is re-loaded and re-checked inside the loop, so a node + whose SNS credentials are blank is skipped instead of having a + rotation triggered against stale state. + """ + + def _make_node(self, name='soft-rotate-node', **kwargs): + defaults = dict( + public_ip='192.0.2.10', # TEST-NET-1, RFC 5737 + private_ip='10.255.255.254', + location='Unit Test', + sns_endpoint='arn:aws:sns:ap-northeast-1:0:topic', + sns_access_key='mock', + sns_secret_key='mock', + is_active=True, + ) + defaults.update(kwargs) + return models.Node.objects.create(name=name, **defaults) + + def test_restores_is_active_when_change_ip_raises(self): + """change_ip() failing mid-cycle must leave the node ACTIVE. + + This is the core regression: pre-fix, an exception after the node + was inactivated left is_active=False permanently. + """ + node = self._make_node() + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip', + side_effect=RuntimeError('boom')) as m_change_ip: + # The exception propagates out of the loop, but the finally block + # must already have restored is_active before it does. + with self.assertRaises(RuntimeError): + models.Node.change_ips_softly() + m_change_ip.assert_called_once() + node.refresh_from_db() + self.assertTrue(node.is_active, + 'a change_ip() failure must not leave the node stuck inactive') + + def test_happy_path_keeps_node_active(self): + """On success the node is rotated once and left active.""" + node = self._make_node() + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip') as m_change_ip: + models.Node.change_ips_softly() + m_change_ip.assert_called_once() + node.refresh_from_db() + self.assertTrue(node.is_active) + + def test_skips_node_with_blank_credentials(self): + """A node whose SNS credentials are blank is selected by the + not-null filter but skipped by the in-loop guard — change_ip is + never called and the node is left untouched (active).""" + node = self._make_node(name='blank-creds', + sns_access_key='', sns_secret_key='') + with mock.patch('shadowsocks.models.time.sleep'), \ + mock.patch.object(models.Node, 'change_ip') as m_change_ip: + models.Node.change_ips_softly() + m_change_ip.assert_not_called() + node.refresh_from_db() + self.assertTrue(node.is_active) From 7d225a210b9cf2f43c94a8c0240961d9ddc4f63c Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 11 Jun 2026 07:41:49 +0800 Subject: [PATCH 2/2] fix(rotation): stop change_ips_softly redelivery loop + add Slack rotation alerts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 2026 IP-rotation incident's true root cause was a Celery message redelivery loop, not only the stuck-node symptom fixed in 491b845. change_ips_softly blocks the solo worker ~6 min (5 min DNS TTL + 1 min AWS Config wait); that lapses the 120s AMQP broker heartbeat, so RabbitMQ drops the connection and redelivers the still-unacked message — turning one scheduled fire into an endless ~6-min rotation (observed redelivered=N/N). - tasks.py: node_change_ips_softly back to early-ack (acks_late=False). The prior acks_late=True + reject_on_worker_lost=True GUARANTEED the loop: a 6-min task never acks before the heartbeat lapses, so every run redelivers. Early-ack drops a single run on worker death instead of looping forever. - settings.py: CELERY_BROKER_HEARTBEAT=0 + CELERY_WORKER_PREFETCH_MULTIPLIER=1 so a long blocking task can't kill the connection or strand prefetched siblings. (Long-term follow-up: make the task non-blocking via chained apply_async(countdown=...).) - models.py: best-effort Slack notification on Node.public_ip change (SSM_SLACK_WEBHOOK_URL, with $SSM_DATA_HOME/.slack-webhook fallback) so any rotation, expected or not, is surfaced immediately. Never raises. - tests.py: redelivery-config guards (early-ack, heartbeat, prefetch) and Slack-notify behavior (fires on IP change, skips no-op/unconfigured, swallows delivery failure). Co-Authored-By: Claude Opus 4.8 --- shadowsocks_manager/shadowsocks/models.py | 54 +++++++++++++- shadowsocks_manager/shadowsocks/tasks.py | 17 +++-- shadowsocks_manager/shadowsocks/tests.py | 70 ++++++++++++++++++- .../shadowsocks_manager/settings.py | 17 +++++ 4 files changed, 150 insertions(+), 8 deletions(-) diff --git a/shadowsocks_manager/shadowsocks/models.py b/shadowsocks_manager/shadowsocks/models.py index 0aaeaf7..f91e544 100644 --- a/shadowsocks_manager/shadowsocks/models.py +++ b/shadowsocks_manager/shadowsocks/models.py @@ -1033,8 +1033,60 @@ def update_by_account(sender, instance, **kwargs): instance.on_update() +def _slack_webhook_url(): + """Return the Slack incoming-webhook URL, or '' if not configured. + + Prefers settings.SSM_SLACK_WEBHOOK_URL (from the SSM_SLACK_WEBHOOK_URL env + var); falls back to the file $SSM_DATA_HOME/.slack-webhook so a URL can be + dropped onto a running container without a restart. + """ + from django.conf import settings + import os + url = (getattr(settings, 'SSM_SLACK_WEBHOOK_URL', '') or '').strip() + if url: + return url + try: + path = os.path.join(os.getenv('SSM_DATA_HOME', '/var/local/ssm'), '.slack-webhook') + with open(path) as f: + return f.read().strip() + except Exception: + return '' + + +def _notify_ip_change(node, old_ip, new_ip): + """Best-effort Slack notification when a node's public IP changes (rotation). + + Fires for any public_ip change, scheduled or not, so an unexpected rotation + is surfaced immediately. Never raises: a notification failure must not break + Node.save(). + """ + url = _slack_webhook_url() + if not url: + return + try: + import json + import urllib.request + rec = getattr(node, 'record', None) + fqdn = ' (%s)' % rec.fqdn if rec is not None and getattr(rec, 'fqdn', None) else '' + text = ':rotating_light: VPN node *%s* IP changed: `%s` -> `%s`%s' % ( + node.name, old_ip, new_ip, fqdn) + req = urllib.request.Request( + url, data=json.dumps({'text': text}).encode('utf-8'), + headers={'Content-Type': 'application/json'}, method='POST') + urllib.request.urlopen(req, timeout=5).read() + except Exception as e: + logger.warning('slack ip-change notify failed for %s: %s', node.name, e) + + @receiver(post_save, sender=Node) -def update_by_node(sender, instance, **kwargs): +def update_by_node(sender, instance, created=False, **kwargs): + # Notify on a real public-IP change (rotation). The post-save snapshot in + # Node.save() runs AFTER this receiver, so _original_public_ip still holds + # the prior IP here and can be compared against the just-saved value. + old_ip = getattr(instance, '_original_public_ip', None) + new_ip = instance.public_ip + if not created and old_ip and new_ip and old_ip != new_ip: + _notify_ip_change(instance, old_ip, new_ip) instance.on_update() diff --git a/shadowsocks_manager/shadowsocks/tasks.py b/shadowsocks_manager/shadowsocks/tasks.py index eb897df..d4b148e 100644 --- a/shadowsocks_manager/shadowsocks/tasks.py +++ b/shadowsocks_manager/shadowsocks/tasks.py @@ -16,11 +16,16 @@ def port_heartbeat(): def node_change_ips(): return Node.change_ips() -# acks_late + reject_on_worker_lost: change_ips_softly runs for ~10 min per -# node and was the task in flight when a celery worker hung in D-state during -# the IP-rotation incident. With late acks the broker only drops the message -# after the task finishes, and reject_on_worker_lost requeues it (instead of -# silently discarding) if the worker is killed (e.g. `pkill -9 celery`) mid-run. -@shared_task(acks_late=True, reject_on_worker_lost=True) +# IMPORTANT: this task must NOT use acks_late / reject_on_worker_lost. +# change_ips_softly blocks the worker for ~6 min per node (DNS TTL + Config +# wait). With late acks the message stays unacked for the whole run; the +# blocking sleep also lapses the AMQP heartbeat (see CELERY_BROKER_HEARTBEAT +# in settings), so the broker drops the connection and REDELIVERS the still- +# unacked message — turning one scheduled fire into an endless ~6-min rotation +# loop (the 2026 IP-rotation incident; observed redelivered=N/N in the queue). +# Early ack (the default) means the message is acked at delivery: a worker +# dying mid-run loses only that single run — recovered by the next schedule and +# the is_active finally-guard in change_ips_softly — instead of looping forever. +@shared_task(acks_late=False) def node_change_ips_softly(): return Node.change_ips_softly() diff --git a/shadowsocks_manager/shadowsocks/tests.py b/shadowsocks_manager/shadowsocks/tests.py index 3241483..9ad2a95 100644 --- a/shadowsocks_manager/shadowsocks/tests.py +++ b/shadowsocks_manager/shadowsocks/tests.py @@ -10,7 +10,7 @@ import botocore from abc import abstractmethod from unittest import mock -from django.test import TestCase +from django.test import TestCase, override_settings from django.core.exceptions import ValidationError from django.core.management import call_command @@ -889,3 +889,71 @@ def test_skips_node_with_blank_credentials(self): m_change_ip.assert_not_called() node.refresh_from_db() self.assertTrue(node.is_active) + + +class NodeIpChangeSlackNotifyTestCase(AppTestCase): + """A Node.public_ip change emits a best-effort Slack notification.""" + + def _node(self, name='slack-node', **kwargs): + defaults = dict(public_ip='192.0.2.10', private_ip='10.255.255.254', + location='Unit Test', is_active=True) + defaults.update(kwargs) + return models.Node.objects.create(name=name, **defaults) + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_notifies_on_public_ip_change(self): + node = self._node() + with mock.patch('urllib.request.urlopen') as m_open: + node.public_ip = '192.0.2.20' + node.save() + self.assertEqual(m_open.call_count, 1) + body = m_open.call_args[0][0].data.decode('utf-8') + self.assertIn('192.0.2.10', body) # old IP present + self.assertIn('192.0.2.20', body) # new IP present + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_no_notify_when_ip_unchanged(self): + node = self._node() + with mock.patch('urllib.request.urlopen') as m_open: + node.location = 'somewhere else' # save without touching public_ip + node.save() + m_open.assert_not_called() + + def test_no_notify_when_webhook_unconfigured(self): + node = self._node() + with mock.patch('shadowsocks.models._slack_webhook_url', return_value=''), \ + mock.patch('urllib.request.urlopen') as m_open: + node.public_ip = '192.0.2.30' + node.save() + m_open.assert_not_called() + + @override_settings(SSM_SLACK_WEBHOOK_URL='https://hooks.slack.test/abc') + def test_notify_failure_does_not_break_save(self): + node = self._node() + with mock.patch('urllib.request.urlopen', side_effect=OSError('network down')): + node.public_ip = '192.0.2.40' + node.save() # must not raise + node.refresh_from_db() + self.assertEqual(node.public_ip, '192.0.2.40') + + +class CeleryRotationTaskConfigTestCase(TestCase): + """Guard the broker/task settings that prevent the redelivery loop. + + change_ips_softly blocks the worker ~6 min; with late acks or a live AMQP + heartbeat the broker drops the connection mid-run and redelivers the message, + looping forever (the 2026 IP-rotation incident). + """ + + def test_change_ips_softly_uses_early_ack(self): + from shadowsocks.tasks import node_change_ips_softly + self.assertFalse(node_change_ips_softly.acks_late, + 'node_change_ips_softly must early-ack: late-ack on a ~6 min blocking ' + 'task causes infinite broker redelivery') + + def test_broker_heartbeat_disabled_and_single_prefetch(self): + from django.conf import settings + self.assertEqual(settings.CELERY_BROKER_HEARTBEAT, 0, + 'a non-zero heartbeat lapses during the long blocking task and triggers ' + 'connection-drop redelivery') + self.assertEqual(settings.CELERY_WORKER_PREFETCH_MULTIPLIER, 1) diff --git a/shadowsocks_manager/shadowsocks_manager/settings.py b/shadowsocks_manager/shadowsocks_manager/settings.py index 1e3b3f3..307381b 100644 --- a/shadowsocks_manager/shadowsocks_manager/settings.py +++ b/shadowsocks_manager/shadowsocks_manager/settings.py @@ -340,3 +340,20 @@ def __repr__(self): CELERY_CACHE_BACKEND = 'django-cache' CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' CELERY_BROKER_URL = 'amqp://guest:guest@{}:{}//'.format(RABBITMQ_HOST, RABBITMQ_PORT) + +# shadowsocks.tasks.node_change_ips_softly blocks the worker ~6 min per node +# (5 min DNS TTL + 1 min AWS Config wait). The default 120s AMQP heartbeat would +# lapse during that blocking sleep, so the broker drops the connection and +# REDELIVERS the in-flight message — turning one scheduled rotation into an +# endless ~6-min loop (the 2026 IP-rotation incident). Disable the heartbeat so a +# long blocking task can't kill the connection, and prefetch one task at a time so +# there are no prefetched siblings to requeue either. +# (Proper long-term fix: make change_ips_softly non-blocking via chained tasks +# with apply_async(countdown=...). Tracked as a follow-up.) +CELERY_BROKER_HEARTBEAT = 0 +CELERY_WORKER_PREFETCH_MULTIPLIER = 1 + +# Optional Slack incoming-webhook URL for IP-rotation alerts (empty => disabled). +# Falls back to the file $SSM_DATA_HOME/.slack-webhook when this env var is unset, +# so a URL can be dropped onto a running container without a restart. +SSM_SLACK_WEBHOOK_URL = config('SSM_SLACK_WEBHOOK_URL', default='')