In this article, I want to cover a simplified but working example of decentralized Blockchain based on Proof Of Work algorithm, some sort of simplified Bitcoin. You can think about it as a simplified version of Bitcoin.
What I will describe:
- PoW blockchain theory
- How to write blockchain in Python from scratch
- How to write Full-Node and create decentralized nodes to run the blockchain on them
The code you can find in my GitHub: https://github.com/creotiv/full_node_blockchain
First you should understand that Blockchain and Cryptocurrencies are not the same. Blockchain is a cryptography algorithm to store data in a decentralized way when you cant trust anyone. Cryptocurrencies are little part of all projects that used Blockchain.
Second. Idea of decentralized data storing and processing lies in the ability to create conditions where the system can be controlled only by most of the people and not by just some of them. Which blocks people, parties, countries from enforcing their rules on users of the system. Basically some sort of Free speech, but for the internet services.
Proof of Work is one of the oldest algorithms of consensus used in a Blockchain. Consensus — is a way how decentralized actors/nodes can agree on something that happens in the system, like data update.
So for us Blockchain is a decentralized database where we have transactions to update it state and have a consensus algorithm so nodes can agree that the update is valid. And that what is Blockchain from a global perspective.
Transactions
So assume we have many nodes, each of it has acurrent data state. So we need some way to update that state and save an order of our updates(in most cases like money transfers it matters).
We have 3 ways of doing our transactions: save new data state in the transaction, save diff of states(+$100/-$100), save state flow.
In the first one we can’t verify the chain of our modification, in case of error or attack. Even current centralized financial solutions doesn’t save just your current money balance, instead they save log of all updates to your balance, from which are calculation your current balance.
That’s why most(didn’t see them all :) ) blockchain solutions use diff of state(Ethereum) or state flow pattern(Bitcoin) to describe transactions.
Here is how looks like state flow:
Each Transaction has Inputs and Outputs, where each Input pointing on the previous output. Each Output can be used as an input only 1 time.
Each input is signed by a private key of user wallet to prove that he is the owner of that Output.
So for example you have one Output from the exchange that saying that you were bought 10 coins to your wallet. And now you transferring 5 coins to your friend wallet. You left 5 coins from the previous output not spent, if you left them like this system will count them as a fee for processing your Transaction. To not pay a fee, you can add additional Output that will move money back to your wallet.
That’s the idea behind the transactions in a bitcoin-like system.
But now we need add some order to Txs, as we have a decentralized system and data can mess up.
We cant use time as other systems, because it not trusted data. That where we get Blocks, from which came the name Blockchain.
Blocks
Blocks add an order for our transactions. All Txs that go in the same block considering running at the same time.
Here is how they look like:
Each block consists of data(list of transactions), hash of the block(got from mining), and link to the previous block hash, and some additional data like time, nonce, block index.
First transaction in a block is a COINBASE transaction, that doesnt have previous hash and is used to give reward for mining.
Each block hash is constructed based on Tx hashes, previous block hash, nonce and other data. Thus you cant replace block from inside the chain. The only way how you can change the data in the whole blockchain is to recompute hashes for all blocks. And that’s where the Proof of Work algorithm takes his place.
Proof Of Work algorithm
As recomputing hashes it’s not a problem, we need some mechanism to make this unreal. PoW one of such things.
Instead of just use any hash for our block, PoW set rules on which hash we can consider valid. Such rules add a level of difficulty to calculate the hash, cause now it need many iterations to find a valid hash, thus it takes time and resources.
From that point we see that attackers can’t modify running blockchain without having at least 51% of all resources used in it. Because not forget that block mined decentralized by thousands of different nodes.
Mining
Mining is a process of finding hash for the Block that will be considered valid by the system.
If you look at the block description you will find nonce field. That field is used to change the hash for mining, as we can change block data.
That’s working because even small modification to data, makes hash totally different.
For each mined Block miner receive some reward and also all fees from Txs.
With some amount of blocks reward and difficulty of hash rules may change. That’s how the total supply of coins is limited.
Merkel Tree
Merkel Tree — is an algorithm to hash data with a tree structure, which adds a possibility to update a resulting hash without whole tree re-computation when new data added.
It is used to hash list of transactions in a Block and save resources during mining by removing the need to recompute all Txs hashes each time.
Also, it gives the ability to find if some data in a tree with less computation needed.
System together
We have some number of nodes. Each of them has a duplicate of the whole data of the blockchain(such nodes called Full-Node).
Nodes are using the gossip communication principle — If we get data that we didn’t see we broadcast it to other nodes that we know. In such a way data like Txs, blocks, new nodes addresses, etc, are shared across all nodes.
When some time/number of Txs passed from the adding(mining) of the last block, nodes start mining for a new block concurrently, the first who mine it, add it to his chain and share to other nodes, they validate it and if it ok add it also and broadcast it farther.
In some situations Split Brain situation may occur, when two nodes mine two different blocks at the ~same time. In such case some nodes continue mining new Block based on first Block, and other on the second. The first chain which will be longer wins and will be added to the blockchain.
Problems
As we see if 3 pool merges, they will have more than 51% mining resources, which will give them ability to compromise the network. Also it shows that bitcoin is not such decentralized solution as many people think.
You can proceed straight to the code: https://github.com/creotiv/full_node_blockchain
Project structure:
I tried to not add different hard things that are not related to the blockchain directly, to minimize code amount so you are not lost in it. Also nodes don’t save their states on shut down.
What i covered in this demo:
What i didn’t covered in this demo:
Wallet
import rsa
import binascii
class Address:
def __init__(self, addr):
if isinstance(addr, rsa.PublicKey):
self.addr = addr
else:
if isinstance(addr,str):
addr = addr.encode()
# thats not clean bu i didnt find simple crypto library for 512 sha key
# to get address/public_key short.
self.addr = rsa.PublicKey.load_pkcs1(b'-----BEGIN RSA PUBLIC KEY-----\n%b\n-----END RSA PUBLIC KEY-----\n' % addr)
def __str__(self):
return b''.join(self.addr.save_pkcs1().split(b'\n')[1:-2]).decode()
@property
def key(self):
return self.addr
class Wallet:
'''For real case wallet use ECDSA cryptography'''
__slots__ = '_pub', '_priv'
def __init__(self, pub=None, priv=None):
if pub:
self._pub = Address(pub)
self._priv = rsa.PrivateKey.load_pkcs1(priv)
@classmethod
def create(cls):
inst = cls(b'',b'')
_pub, _priv = rsa.newkeys(512)
inst._pub = Address(_pub)
inst._priv = _priv
return inst
@classmethod
def verify(cls, data, signature, address):
signature = binascii.unhexlify(signature.encode())
if not isinstance(address, Address):
address = Address(address)
try:
return rsa.verify(data, signature, address.key) == 'SHA-256'
except:
return False
@property
def address(self):
return str(self._pub)
@property
def priv(self):
return self._priv.save_pkcs1()
def sign(self, hash):
return binascii.hexlify(rsa.sign(hash, self._priv, 'SHA-256')).decode()
We create some wrapper around RSA python library. Wallet consists from 2 keys: public and private. The Public key is our blockchain address, and it used to verify the signature on data, which we make with our private key(which is not shared with anyone).
In Bitcoin and other solution ECDSA cryptography are used instead of RSA
Blocks
import time
from hashlib import sha256
from merkletools import MerkleTools
from .wallet import Address
class Input:
__slots__ = 'prev_tx_hash', 'output_index', 'signature', '_hash', 'address', 'index', 'amount'
def __init__(self, prev_tx_hash, output_index, address, index=0):
self.prev_tx_hash = prev_tx_hash
self.output_index = output_index
self.address = address
self.index = 0
self._hash = None
self.signature = None
self.amount = None
def sign(self, wallet):
hash_string = '{}{}{}{}'.format(
self.prev_tx_hash, self.output_index, self.address, self.index
).encode()
self.signature = wallet.sign(hash_string)
@property
def hash(self):
if self._hash:
return self._hash
if not self.signature and self.prev_tx_hash != 'COINBASE':
raise Exception('Sing the input first')
hash_string = '{}{}{}{}'.format(
self.prev_tx_hash, self.output_index, self.address, self.signature, self.index
)
self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
return self._hash
@property
def as_dict(self):
return {
"prev_tx_hash":self.prev_tx_hash,
"output_index":self.output_index,
"address":str(self.address),
"index":self.index,
"hash":self.hash,
"signature":self.signature
}
@classmethod
def from_dict(cls, data):
inst = cls(
data['prev_tx_hash'],
data['output_index'],
Address(data['address']),
data['index'],
)
inst.signature = data['signature']
inst._hash = None
return inst
class Output:
__slots__ = '_hash', 'address', 'index', 'amount', 'input_hash'
def __init__(self, address, amount, index=0):
self.address = address
self.index = 0
self.amount = int(amount)
# i use input hash here to make output hash unique, especialy for COINBASE tx
self.input_hash = None
self._hash = None
@property
def hash(self):
if self._hash:
return self._hash
hash_string = '{}{}{}{}'.format(
self.amount, self.index, self.address, self.input_hash
)
self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
return self._hash
@property
def as_dict(self):
return {
"amount":int(self.amount),
"address":str(self.address),
"index":self.index,
"input_hash": self.input_hash,
"hash":self.hash
}
@classmethod
def from_dict(cls, data):
inst = cls(
Address(data['address']),
data['amount'],
data['index'],
)
inst.input_hash = data['input_hash']
inst._hash = None
return inst
class Tx:
__slots__ = 'inputs', 'outputs', 'timestamp', '_hash'
def __init__(self, inputs, outputs, timestamp=None):
self.inputs = inputs
self.outputs = outputs
self.timestamp = timestamp or int(time.time())
self._hash = None
@property
def hash(self):
if self._hash:
return self._hash
# calculating input_hash for outputs
inp_hash = sha256((str([el.as_dict for el in self.inputs]) + str(self.timestamp)).encode()).hexdigest()
for el in self.outputs:
el.input_hash = inp_hash
hash_string = '{}{}{}'.format(
[el.as_dict for el in self.inputs], [el.as_dict for el in self.outputs], self.timestamp
)
self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode('utf8')).hexdigest()
return self._hash
@property
def as_dict(self):
inp_hash = sha256((str([el.as_dict for el in self.inputs]) + str(self.timestamp)).encode()).hexdigest()
for el in self.outputs:
el.input_hash = inp_hash
return {
"inputs":[el.as_dict for el in self.inputs],
"outputs":[el.as_dict for el in self.outputs],
"timestamp":self.timestamp,
"hash":self.hash
}
@classmethod
def from_dict(cls, data):
inps = [Input.from_dict(el) for el in data['inputs']]
outs = [Output.from_dict(el) for el in data['outputs']]
inp_hash = sha256((str([el.as_dict for el in inps]) + str(data['timestamp'])).encode()).hexdigest()
for el in outs:
el.input_hash = inp_hash
inst = cls(
inps,
outs,
data['timestamp'],
)
inst._hash = None
return inst
class Block:
__slots__ = 'nonce', 'prev_hash', 'index', 'txs', 'timestamp', 'merkel_root'
def __init__(self, txs, index, prev_hash, timestamp=None, nonce=0):
self.txs = txs or []
self.prev_hash = prev_hash
self.index = index
self.nonce = nonce
self.timestamp = timestamp or int(time.time())
self.merkel_root = None
def build_merkel_tree(self):
"""
Merkel Tree used to hash all the transactions, and on mining do not recompute Txs hash everytime
Which making things much faster.
And tree used because we can append new Txs and rebuild root hash much faster, when just building
block before mine it.
"""
if self.merkel_root:
return self.merkel_root
mt = MerkleTools(hash_type="SHA256")
for el in self.txs:
mt.add_leaf(el.hash)
mt.make_tree()
self.merkel_root = mt.get_merkle_root()
return self.merkel_root
def hash(self, nonce=None):
if nonce:
self.nonce = nonce
block_string = '{}{}{}{}{}'.format(
self.build_merkel_tree(), self.prev_hash, self.index, self.nonce, self.timestamp
)
return sha256(sha256(block_string.encode()).hexdigest().encode('utf8')).hexdigest()
@property
def as_dict(self):
return {
"index": self.index,
"timestamp": self.timestamp,
"prev_hash": self.prev_hash,
"hash": self.hash(),
"txs": [el.as_dict for el in self.txs],
"nonce": self.nonce,
"merkel_root":self.merkel_root
}
@classmethod
def from_dict(cls, data):
return cls(
[Tx.from_dict(el) for el in data['txs']],
data['index'],
data['prev_hash'],
data['timestamp'],
data['nonce']
)
In a blocks.py we describe our blockchain building blocks as Txs, Input, Output and Block . Each class has hash, as_dict, from_dict methods.
We sign each Input with our wallet instance.
Output class has field input_hash that used to create a unique hash for each output in a transaction, in other cases it would be similar in many cases
As i said before we use the Merkel Tree algorithm to hash all transactions in a block to speed up mining
Verifiers
import rsa
import binascii
from .wallet import Address
class TxVerifier:
def __init__(self, db):
self.db = db
def verify(self, inputs, outputs):
total_amount_in = 0
total_amount_out = 0
for i,inp in enumerate(inputs):
if inp.prev_tx_hash == 'COINBASE' and i == 0:
total_amount_in = int(self.db.config['mining_reward'])
continue
try:
out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
except KeyError:
raise Exception('Transaction output not found.')
total_amount_in += int(out['amount'])
if (inp.prev_tx_hash,out['hash']) not in self.db.unspent_txs_by_user_hash.get(out['address'], set()):
raise Exception('Output of transaction already spent.')
hash_string = '{}{}{}{}'.format(
inp.prev_tx_hash, inp.output_index, inp.address, inp.index
)
try:
rsa.verify(hash_string.encode(), binascii.unhexlify(inp.signature.encode()), Address(out['address']).key) == 'SHA-256'
except:
raise Exception('Signature verification failed: %s' % inp.as_dict)
for out in outputs:
total_amount_out += int(out.amount)
if total_amount_in < total_amount_out:
raise Exception('Insuficient funds.')
return total_amount_in - total_amount_out
class BlockOutOfChain(Exception):
pass
class BlockVerificationFailed(Exception):
pass
class BlockVerifier:
def __init__(self, db):
self.db = db
self.tv = TxVerifier(db)
def verify(self, head, block):
total_block_reward = int(self.db.config['mining_reward'])
# verifying block hash
if int(block.hash(), 16) > (2 ** (256-self.db.config['difficulty'])):
raise BlockVerificationFailed('Block hash bigger then target difficulty')
# verifying transactions in a block
for tx in block.txs[1:]:
fee = self.tv.verify(tx.inputs, tx.outputs)
total_block_reward += fee
total_reward_out = 0
for out in block.txs[0].outputs:
total_reward_out += out.amount
# verifying block reward
if total_block_reward != total_reward_out:
raise BlockVerificationFailed('Wrong reward sum')
# verifying some other things
if head:
if head.index >= block.index:
raise BlockOutOfChain('Block index number wrong')
if head.hash() != block.prev_hash:
raise BlockOutOfChain('New block not pointed to the head')
if head.timestamp > block.timestamp:
raise BlockOutOfChain('Block from the past')
return True
One of the main parts of our system, as we need to be sure that the data that we get from the other nodes are valid.
Previous Inputs to our Txs controlled with internal DB, that updated with each block to remove needs of passing through thewhole blockchain(27GB now) to find needed data. Basically it’s how Blockchain is saved on nodes in a network.
Blockchain
from .blocks import Block, Tx, Input, Output
from .verifiers import TxVerifier, BlockOutOfChain, BlockVerifier, BlockVerificationFailed
import logging
logger = logging.getLogger('Blockchain')
class Blockchain:
__slots__ = 'max_nonce', 'chain', 'unconfirmed_transactions', 'db', 'wallet', 'on_new_block', 'on_prev_block', 'current_block_transactions', 'fork_blocks'
def __init__(self, db, wallet, on_new_block=None, on_prev_block=None):
self.max_nonce = 2**32
self.db = db
self.wallet = wallet
self.on_new_block = on_new_block
self.on_prev_block = on_prev_block
self.unconfirmed_transactions = set()
self.current_block_transactions = set()
self.chain = []
self.fork_blocks = {}
def create_first_block(self):
"""
Creating first block in a chain. Only COINBASE Tx.
"""
tx = self.create_coinbase_tx()
block = Block([tx], 0, 0x0)
self.mine_block(block)
def create_coinbase_tx(self, fee=0):
inp = Input('COINBASE',0,self.wallet.address,0)
inp.sign(self.wallet)
out = Output(self.wallet.address, self.db.config['mining_reward']+fee, 0)
return Tx([inp],[out])
def is_valid_block(self, block):
bv = BlockVerifier(self.db)
return bv.verify(self.head, block)
def add_block(self, block):
if self.head and block.hash() == self.head.hash():
logger.error('Duplicate block')
return False
try:
self.is_valid_block(block)
except BlockOutOfChain:
# Here we covering split brain case only for next 2 leves of blocks
# with high difficulty its a rare case, and more then 2 level much more rare.
if block.prev_hash == self.head.prev_hash:
logger.error('Split Brain detected')
self.fork_blocks[block.hash()] = block
return False
else:
for b_hash, b in self.fork_blocks.items():
if block.prev_hash == b_hash:
logger.error('Split Brain fixed. Longer chain choosen')
self.rollback_block()
self.chain.append(b)
self.chain.append(block)
self.fork_blocks = {}
return True
logger.error('Second Split Brain detected. Not programmed to fix this')
return False
except BlockVerificationFailed as e:
logger.error('Block verification failed: %s' % e)
return False
else:
self.chain.append(block)
self.fork_blocks = {}
logger.info(' Block added')
return True
logger.error('Hard chain out of sync')
def add_tx(self, tx):
if self.db.transaction_by_hash.get(tx.hash):
return False
tv = TxVerifier(self.db)
fee = tv.verify(tx.inputs, tx.outputs)
self.db.transaction_by_hash[tx.hash] = tx.as_dict
self.unconfirmed_transactions.add((fee, tx.hash))
return True
def force_block(self, check_stop=None):
'''
Forcing to mine block. Gthering all txs with some limit. First take Txs with bigger fee.
'''
txs = sorted(self.unconfirmed_transactions, key=lambda x:-x[0])[:self.db.config['txs_per_block']]
self.current_block_transactions = set(txs)
fee = sum([v[0] for v in txs])
txs = [Tx.from_dict(self.db.transaction_by_hash[v[1]]) for v in txs ]
block = Block(
txs=[self.create_coinbase_tx(fee)] + txs,
index=self.head.index+1,
prev_hash=self.head.hash(),
)
self.mine_block(block, check_stop)
def rollover_block(self, block):
'''
As we use some sort of DB, we need way to update it depends we need add block or remove.
So we have 2 methods Rollover and Rollback.
Also i added some sort of callback in case some additional functionality should be added on top.
For example some Blockchain analytic DB.
'''
self.unconfirmed_transactions -= self.current_block_transactions
self.db.block_index = block.index
for tx in block.txs:
self.db.transaction_by_hash[tx.hash] = tx.as_dict
for out in tx.outputs:
self.db.unspent_txs_by_user_hash[str(out.address)].add((tx.hash,out.hash))
self.db.unspent_outputs_amount[str(out.address)][out.hash] = int(out.amount)
for inp in tx.inputs:
if inp.prev_tx_hash == 'COINBASE':
continue
prev_out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
self.db.unspent_txs_by_user_hash[prev_out['address']].remove((inp.prev_tx_hash,prev_out['hash']))
del self.db.unspent_outputs_amount[prev_out['address']][prev_out['hash']]
if self.on_new_block:
self.on_new_block(block, self.db)
self.current_block_transactions = set()
def rollback_block(self):
block = self.chain.pop()
self.db.block_index -= 1
total_amount_in = 0
total_amount_out = 0
for tx in block.txs:
# removing new unspent outputs
for out in tx.outputs:
self.db.unspent_txs_by_user_hash[str(out.address)].remove((tx.hash,out.hash))
del self.db.unspent_outputs_amount[str(out.address)][out.hash]
total_amount_out += out.amount
# adding back previous unspent outputs
for inp in tx.inputs:
if inp.prev_tx_hash == 'COINBASE':
continue
prev_out = self.db.transaction_by_hash[inp.prev_tx_hash]['outputs'][inp.output_index]
self.db.unspent_txs_by_user_hash[prev_out['address']].add((inp.prev_tx_hash,prev_out['hash']))
self.db.unspent_outputs_amount[prev_out['address']][prev_out['hash']] = prev_out['amount']
total_amount_in += int(prev_out['amount'])
# adding Tx back un unprocessed stack
fee = total_amount_in - total_amount_out
self.unconfirmed_transactions.add((fee,tx.hash))
if self.on_prev_block:
self.on_prev_block(block, self.db)
def mine_block(self, block, check_stop=None):
'''
Mine a block with ability to stop in case if check callback return True
'''
for n in range(self.max_nonce):
if check_stop and check_stop():
logger.error('Mining interrupted.')
return
if int(block.hash(nonce=n), 16) <= (2 ** (256-self.db.config['difficulty'])):
self.add_block(block)
self.rollover_block(block)
logger.info(' Block mined at nonce: %s' % n)
break
@property
def head(self):
if not self.chain:
return None
return self.chain[-1]
@property
def blockchain(self):
return [el.as_dict for el in reversed(self.chain)]
Method force_block used to run mining of the new block by gathering some number of Txs ordered by fee and add Coinbase Tx to them.
After block added the chain we use rollover_block to update our DB with new data.
In case when new block(that we got from a different node) create Split Brain issue we use rollback_block to rollback the chain to the previous block and merge the new longest chain(code do not support multi level split brain, as it almost impossible in the real world)
There also some tests to verify that blockchain code works normaly.
Now we need to make this tun concurrently
Creating full-node with FastApi
I used FastApi as it fast, simple, use asyncio and can build OpenApi schema and debug UI from code(which is awesome).
from fastapi import FastAPI, BackgroundTasks, Request
import uvicorn
import requests
import asyncio
import logging
import sys
from models import *
from blockchain.db import DB
from blockchain.blockchain import Blockchain
from blockchain.wallet import Wallet
from blockchain.api import API
from blockchain.blocks import Input, Output, Tx
# Custom formatter
class ColorFormatter(logging.Formatter):
def __init__(self, fmt="%(asctime)s - Blockchain - %(message)s"):
super(ColorFormatter,self).__init__(fmt)
red = '\033[0;31m'
nc = '\033[0m'
cyan = '\033[0;96m'
err_fmt = f"{red}%(asctime)s - Blockchain{nc} - %(message)s"
info_fmt = f"{cyan}%(asctime)s - Blockchain{nc} - %(message)s"
self.err = logging.Formatter(err_fmt)
self.log = logging.Formatter(info_fmt)
def format(self, record):
if record.levelno == logging.ERROR:
return self.err.format(record)
else:
return self.log.format(record)
logger = logging.getLogger("Blockchain")
app = FastAPI()
app.config = {}
app.jobs = {}
### TASKS
def sync_data():
logger.info('================== Sync started =================')
bc = app.config['api']
head = bc.get_head()
while True:
sync_running = False
for node in app.config['nodes']:
if node == ('%s:%s' % (app.config['host'],app.config['port'])):
continue
url = 'http://%s/chain/sync' % node
start = head['index']+1 if head else 0
while True:
logger.info(url, {"from_block":start, "limit":20})
res = requests.get(url, params={"from_block":start, "limit":20})
if res.status_code == 200:
data = res.json()
if not data:
break
sync_running = True
for block in data:
try:
bc.add_block(block)
except Exception as e:
logger.exception(e)
return
else:
logger.info(f"Block added: #{block['index']}")
start += 20
head = bc.get_head()
if not sync_running:
app.config['sync_running'] = False
logger.info('================== Sync stopped =================')
return
def broadcast(path, data, params=False, fiter_host=None):
for node in list(app.config['nodes'])[:]:
if node == ('%s:%s' % (app.config['host'],app.config['port'])) or fiter_host == node:
continue
url = 'http://%s%s' % (node,path)
logger.info(f'Sending broadcast {url} except: {fiter_host}')
try:
# header added here as we run all nodes on one domain and need somehow understand the sender node
# to not create broadcast loop
if params:
requests.post(url, params=data, timeout=2, headers={'node':'%s:%s' % (app.config['host'],app.config['port'])})
else:
requests.post(url, json=data, timeout=2, headers={'node':'%s:%s' % (app.config['host'],app.config['port'])})
except:
pass
def mine(event):
logger.info('>>>>>>>>>> Starting mining loop')
# In real case you chould do like this, mining script should run in separate process
while True:
try:
def check_stop():
return event.is_set()
logger.info(f'>> Starting new block mining')
app.config['api'].mine_block(check_stop)
logger.info(f'>> New block mined')
broadcast('/chain/add_block', app.config['api'].get_head())
if event.is_set():
return
except asyncio.CancelledError:
logger.info('>>>>>>>>>> Mining loop stopped')
return
except Exception as e:
logger.exception(e)
### SERVER OPERATIONS
@app.post("/chain/stop-mining")
async def stop_mining():
if app.jobs.get('mining'):
app.jobs['mining'].set()
app.jobs['mining'] = None
@app.post("/chain/start-mining")
async def start_minig():
if not app.jobs.get('mining'):
loop = asyncio.get_running_loop()
app.jobs['mining'] = asyncio.Event()
loop.run_in_executor(None, mine, app.jobs['mining'])
@app.get("/server/nodes")
async def get_nodes():
return app.config['nodes']
@app.post("/server/add_nodes")
async def add_nodes(nodes:NodesModel, request: Request):
length = len(app.config['nodes'])
app.config['nodes'] |= set(nodes.nodes)
if length < len(app.config['nodes']):
broadcast('/server/add_nodes', {'nodes':list(app.config['nodes'])}, False, request.headers.get('node'))
logger.info(f'New nodes added: {nodes.nodes}')
return {"success":True}
### DEMO OPERATIONS
@app.get("/demo/send_amount")
async def send_amount(address_to:str, amount:int, background_tasks: BackgroundTasks):
'''Sending amount of coins from server wallet to some other wallet'''
address_from = app.config['wallet'].address
wallet = app.config['wallet']
bc = app.config['api']
unspent_txs = bc.get_user_unspent_txs(address_from)
total = 0
inputs = []
i = 0
try:
while total < amount:
prev = unspent_txs[i]
inp = Input(prev['tx'],prev['output_index'],address_from,i)
inp.sign(wallet)
total += prev['amount']
i += 1
inputs.append(inp)
except Exception as e:
return {"success":False, "msg":str(e)}
outs = [Output(address_to, amount, 0)]
if total - amount > 0:
outs.append(Output(address_from, total - amount, 1))
tx = Tx(inputs,outs)
try:
res = bc.add_tx(tx.as_dict)
except Exception as e:
logger.exception(e)
return {"success":False, "msg":str(e)}
else:
if res:
logger.info(f'Tx added to the stack')
background_tasks.add_task(broadcast, '/chain/tx_create', tx.as_dict, False)
return {"success":True}
logger.info('Tx already in stack. Skipped.')
return {"success":False, "msg":"Duplicate"}
### ON CHAIN OPERATIONS
@app.get("/chain/get_amount")
async def get_wallet(address):
bc = app.config['api']
return {"address": address, "amount":bc.get_user_balance(address)}
@app.get("/chain/get_unspent_tx")
async def get_unspent_tx(address):
bc = app.config['api']
return {"address": address, "tx":bc.get_user_unspent_txs(address)}
@app.get("/chain/status")
async def status():
bc = app.config['api']
head = bc.get_head()
if not head:
return {'empty_node':True}
return {
'block_index':head['index'],
'block_prev_hash':head['prev_hash'],
'block_hash':head['hash'],
'timestamp':head['timestamp']
}
@app.get("/chain/sync")
async def sync(from_block:int, limit:int=20):
bc = app.config['api']
return bc.get_chain(from_block, limit)
@app.post("/chain/add_block")
async def add_block(block:BlockModel, background_tasks: BackgroundTasks, request: Request):
logger.info(f"New block arived: #{block.index} from {request.headers.get('node')}")
if app.config['sync_running']:
logger.error(f'################### Not added, cause sync is running')
return {"success":False, "msg":'Out of sync'}
bc = app.config['api']
head = bc.get_head()
if (head['index'] + 1) < block.index:
app.config['sync_running'] = True
background_tasks.add_task(sync_data)
logger.error(f'################### Not added, cause node out of sync.')
return {"success":False, "msg":'Out of sync'}
try:
res = bc.add_block(block.dict())
if res: restart_miner()
except Exception as e:
logger.exception(e)
return {"success":False, "msg":str(e)}
else:
if res:
logger.info('Block added to the chain')
background_tasks.add_task(broadcast, '/chain/add_block', block.dict(), False, request.headers.get('node'))
return {"success":True}
logger.info('Old block. Skipped.')
return {"success":False, "msg":"Duplicate"}
@app.post("/chain/tx_create")
async def add_tx(tx: TxModel, background_tasks: BackgroundTasks, request: Request):
logger.info(f'New Tx arived')
bc = app.config['api']
try:
res = bc.add_tx(tx.dict())
except Exception as e:
logger.exception(e)
return {"success":False, "msg":str(e)}
else:
if res:
logger.info(f'Tx added to the stack')
background_tasks.add_task(broadcast, '/chain/tx_create', tx.dict(), False, request.headers.get('node'))
return {"success":True}
logger.info('Tx already in stack. Skipped.')
return {"success":False, "msg":"Duplicate"}
@app.on_event("startup")
async def on_startup():
app.config['sync_running'] = True
loop = asyncio.get_running_loop()
# sync data before run the node
await loop.run_in_executor(None, sync_data)
# add our node address to connected node to broadcast around network
loop.run_in_executor(None, broadcast, '/server/add_nodes', {'nodes':['%s:%s' % (app.config['host'],app.config['port'])]}, False)
if app.config['mine']:
app.jobs['mining'] = asyncio.Event()
loop.run_in_executor(None, mine, app.jobs['mining'])
@app.on_event("shutdown")
async def on_shutdown():
if app.jobs.get('mining'):
app.jobs.get('mining').set()
#### Utils ###########################
def restart_miner():
if app.jobs.get('mining'):
loop = asyncio.get_running_loop()
app.jobs['mining'].set()
app.jobs['mining'] = asyncio.Event()
loop.run_in_executor(None, mine, app.jobs['mining'])
if __name__ == "__main__":
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(ColorFormatter())
handler.setLevel(logging.INFO)
logger.addHandler(handler)
import argparse
parser = argparse.ArgumentParser(description='Blockchain full node.')
parser.add_argument('--node', type=str, help='Address of node to connect. If not will init fist node.')
parser.add_argument('--port', required=True, type=int, help='Port on which run the node.')
parser.add_argument('--mine', required=False, type=bool, help='Port on which run the node.')
parser.add_argument('--diff', required=False, type=int, help='Difficulty')
args = parser.parse_args()
_DB = DB()
_DB.config['difficulty']
_W = Wallet.create()
_BC = Blockchain(_DB, _W)
_API = API(_BC)
logger.info(' ####### Server address: %s ########' %_W.address)
app.config['db'] = _DB
app.config['wallet'] = _W
app.config['bc'] = _BC
app.config['api'] = _API
app.config['port'] = args.port
app.config['host'] = '127.0.0.1'
app.config['nodes'] = set([args.node]) if args.node else set(['127.0.0.1:%s' % args.port])
app.config['sync_running'] = False
app.config['mine'] = args.mine
if not args.node:
_BC.create_first_block()
uvicorn.run(app, host="127.0.0.1", port=args.port, access_log=True)
To wrap some additional functionality around blockchain code i added some API layer between node and blockchain.
Here we have 3 async tasks: mine, broadcast and sync_data.
We should run mining as a separate process, but this will add more code, so right now it just running in the same thread, which is ok for the test. Broadcast is used to spread data across known nodes. And Data Sync getting blockchain from the node on start or if get outperforming block.
Mining are running without any stop, block after block, if we dont add any Txs then it will have only coinbase transaction.
If we get a new block before mining of the block ends we stop mining and proceed with mining from the new block.
All duplicates Tx, Blocks removed and not broadcasted to the network.